Skip to content

Commit

Permalink
Add DagRun state change to the Listener plugin system(#27113)
Browse files Browse the repository at this point in the history
This PR expands listeners API to be notified about DagRun state changes.

PR #20443 introduced Listener API to Airflow, enabling users and developers
to write Pluggy plugins. Here the same mechanism is used to provide
notifications about DagRun states - whether DagRun started, succeeded or
failed.

Additionally, this PR adds lifecycle methods for plugins - when Airflow Job - 
whether SchedulerJob, BackfillJob or LocalTaskJob runs, it notifies plugin that it
starts or finishes.

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski committed Nov 22, 2022
1 parent aaf5fd6 commit 035315f
Show file tree
Hide file tree
Showing 19 changed files with 493 additions and 32 deletions.
2 changes: 2 additions & 0 deletions airflow/jobs/backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,8 @@ def _get_dag_run(self, dagrun_info: DagRunInfo, dag: DAG, session: Session = Non
run.state = DagRunState.RUNNING
run.run_type = DagRunType.BACKFILL_JOB
run.verify_integrity(session=session)

run.notify_dagrun_state_changed(msg="started")
return run

@provide_session
Expand Down
3 changes: 3 additions & 0 deletions airflow/jobs/base_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.executors.executor_loader import ExecutorLoader
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.stats import Stats
from airflow.utils import timezone
Expand Down Expand Up @@ -110,6 +111,7 @@ def __init__(self, executor=None, heartrate=None, *args, **kwargs):
self.heartrate = heartrate
self.unixname = getuser()
self.max_tis_per_query: int = conf.getint("scheduler", "max_tis_per_query")
get_listener_manager().hook.on_starting(component=self)
super().__init__(*args, **kwargs)

@cached_property
Expand Down Expand Up @@ -252,6 +254,7 @@ def run(self):
self.state = State.FAILED
raise
finally:
get_listener_manager().hook.before_stopping(component=self)
self.end_date = timezone.utcnow()
session.merge(self)
session.commit()
Expand Down
4 changes: 2 additions & 2 deletions airflow/jobs/scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,6 @@ def _update_state(dag: DAG, dag_run: DagRun):
Stats.timing(f"dagrun.schedule_delay.{dag.dag_id}", schedule_delay)

for dag_run in dag_runs:

dag = dag_run.dag = self.dagbag.get_dag(dag_run.dag_id, session=session)
if not dag:
self.log.error("DAG '%s' not found in serialized_dag table", dag_run.dag_id)
Expand All @@ -1230,6 +1229,7 @@ def _update_state(dag: DAG, dag_run: DagRun):
else:
active_runs_of_dags[dag_run.dag_id] += 1
_update_state(dag, dag_run)
dag_run.notify_dagrun_state_changed()

@retry_db_transaction
def _schedule_all_dag_runs(self, guard, dag_runs, session):
Expand Down Expand Up @@ -1294,6 +1294,7 @@ def _schedule_dag_run(
msg="timed_out",
)

dag_run.notify_dagrun_state_changed()
return callback_to_execute

if dag_run.execution_date > timezone.utcnow() and not dag.allow_future_exec_dates:
Expand Down Expand Up @@ -1518,7 +1519,6 @@ def _find_zombies(self, session: Session) -> None:
self.log.warning("Failing (%s) jobs without heartbeat after %s", len(zombies), limit_dttm)

for ti, file_loc in zombies:

zombie_message_details = self._generate_zombie_message_details(ti)
request = TaskCallbackRequest(
full_filepath=file_loc,
Expand Down
9 changes: 4 additions & 5 deletions airflow/listeners/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
from __future__ import annotations

import logging
from types import ModuleType
from typing import TYPE_CHECKING

import pluggy
Expand All @@ -38,10 +37,12 @@ class ListenerManager:
"""Manage listener registration and provides hook property for calling them."""

def __init__(self):
from airflow.listeners import spec
from airflow.listeners.spec import dagrun, lifecycle, taskinstance

self.pm = pluggy.PluginManager("airflow")
self.pm.add_hookspecs(spec)
self.pm.add_hookspecs(lifecycle)
self.pm.add_hookspecs(dagrun)
self.pm.add_hookspecs(taskinstance)

@property
def has_listeners(self) -> bool:
Expand All @@ -53,8 +54,6 @@ def hook(self) -> _HookRelay:
return self.pm.hook

def add_listener(self, listener):
if not isinstance(listener, ModuleType):
raise TypeError("Listener %s is not module", str(listener))
if self.pm.is_registered(listener):
return
self.pm.register(listener)
Expand Down
16 changes: 16 additions & 0 deletions airflow/listeners/spec/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
42 changes: 42 additions & 0 deletions airflow/listeners/spec/dagrun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import TYPE_CHECKING

from pluggy import HookspecMarker

if TYPE_CHECKING:
from airflow.models.dagrun import DagRun

hookspec = HookspecMarker("airflow")


@hookspec
def on_dag_run_running(dag_run: DagRun, msg: str):
"""Called when dag run state changes to RUNNING."""


@hookspec
def on_dag_run_success(dag_run: DagRun, msg: str):
"""Called when dag run state changes to SUCCESS."""


@hookspec
def on_dag_run_failed(dag_run: DagRun, msg: str):
"""Called when dag run state changes to FAIL."""
44 changes: 44 additions & 0 deletions airflow/listeners/spec/lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pluggy import HookspecMarker

hookspec = HookspecMarker("airflow")


@hookspec
def on_starting(component):
"""
Called before Airflow component - jobs like scheduler, worker, or task runner starts.
It's guaranteed this will be called before any other plugin method.
:param component: Component that calls this method
"""


@hookspec
def before_stopping(component):
"""
Called before Airflow component - jobs like scheduler, worker, or task runner stops.
It's guaranteed this will be called after any other plugin method.
:param component: Component that calls this method
"""
File renamed without changes.
22 changes: 20 additions & 2 deletions airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.configuration import conf as airflow_conf
from airflow.exceptions import AirflowException, RemovedInAirflow3Warning, TaskNotFound
from airflow.listeners.listener import get_listener_manager
from airflow.models.abstractoperator import NotMapped
from airflow.models.base import Base, StringID
from airflow.models.expandinput import NotFullyPopulated
Expand Down Expand Up @@ -516,8 +517,8 @@ def update_state(
:param session: Sqlalchemy ORM Session
:param execute_callbacks: Should dag callbacks (success/failure, SLA etc) be invoked
directly (default: true) or recorded as a pending request in the ``callback`` property
:return: Tuple containing tis that can be scheduled in the current loop & `callback` that
directly (default: true) or recorded as a pending request in the ``returned_callback`` property
:return: Tuple containing tis that can be scheduled in the current loop & `returned_callback` that
needs to be executed
"""
# Callback to execute in case of Task Failures
Expand Down Expand Up @@ -571,6 +572,8 @@ def recalculate(self) -> _UnfinishedStates:
if not unfinished.tis and any(leaf_ti.state in State.failed_states for leaf_ti in leaf_tis):
self.log.error("Marking run %s failed", self)
self.set_state(DagRunState.FAILED)
self.notify_dagrun_state_changed(msg="task_failure")

if execute_callbacks:
dag.handle_callback(self, success=False, reason="task_failure", session=session)
elif dag.has_on_failure_callback:
Expand All @@ -590,6 +593,8 @@ def recalculate(self) -> _UnfinishedStates:
elif not unfinished.tis and all(leaf_ti.state in State.success_states for leaf_ti in leaf_tis):
self.log.info("Marking run %s successful", self)
self.set_state(DagRunState.SUCCESS)
self.notify_dagrun_state_changed(msg="success")

if execute_callbacks:
dag.handle_callback(self, success=True, reason="success", session=session)
elif dag.has_on_success_callback:
Expand All @@ -609,6 +614,8 @@ def recalculate(self) -> _UnfinishedStates:
elif unfinished.should_schedule and not are_runnable_tasks:
self.log.error("Task deadlock (no runnable tasks); marking run %s failed", self)
self.set_state(DagRunState.FAILED)
self.notify_dagrun_state_changed(msg="all_tasks_deadlocked")

if execute_callbacks:
dag.handle_callback(self, success=False, reason="all_tasks_deadlocked", session=session)
elif dag.has_on_failure_callback:
Expand Down Expand Up @@ -711,6 +718,17 @@ def _filter_tis_and_exclude_removed(dag: DAG, tis: list[TI]) -> Iterable[TI]:
finished_tis=finished_tis,
)

def notify_dagrun_state_changed(self, msg: str = ""):
if self.state == DagRunState.RUNNING:
get_listener_manager().hook.on_dag_run_running(dag_run=self, msg=msg)
elif self.state == DagRunState.SUCCESS:
get_listener_manager().hook.on_dag_run_success(dag_run=self, msg=msg)
elif self.state == DagRunState.FAILED:
get_listener_manager().hook.on_dag_run_failed(dag_run=self, msg=msg)
# deliberately not notifying on QUEUED
# we can't get all the state changes on SchedulerJob, BackfillJob
# or LocalTaskJob, so we don't want to "falsely advertise" we notify about that

def _get_ready_tis(
self,
schedulable_tis: list[TI],
Expand Down
41 changes: 30 additions & 11 deletions docs/apache-airflow/listeners.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,40 @@ Listeners
Airflow gives you an option to be notified of events happening in Airflow
by writing listeners. Listeners are powered by `pluggy <https://pluggy.readthedocs.io/en/stable/>`__

Right now Airflow exposes few types of events.

Lifecycle events
^^^^^^^^^^^^^^^^
Those events - ``on_starting`` and ``before_stopping`` allow you to react to
lifecycle to an Airflow ``Job``, like ``SchedulerJob`` or ``BackfillJob``.

TaskInstance state change events
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Those events - ``on_task_instance_running``, ``on_task_instance_success`` and ``on_task_instance_failed``
once ``TaskInstance`` state changes to one of the respective states. This generally happens on ``LocalTaskJob``.

DagRun state change events
^^^^^^^^^^^^^^^^^^^^^^^^^^
Those events - ``on_dag_run_running``, ``on_dag_run_success`` and ``on_dag_run_failed``
once ``DagRun`` state changes to one of the respective states. This generally happens on ``SchedulerJob`` or ``BackfillJob``.

Usage
=====

To create a listener you will need to derive the import
``airflow.listeners.hookimpl`` and implement the ``hookimpls`` for
events you want to be notified at.

Their specification is defined as ``hookspec`` in ``airflow/listeners/spec`` directory.
Your implementation needs to accept the same named parameters as defined in hookspec, or Pluggy will complain about your plugin.
On the other hand, you don't need to implement every method - it's perfectly fine to have a listener that implements just one method, or any subset of methods.

To include listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin </plugins>`

Listener API is meant to be called across all dags, and all operators - in contrast to methods like
``on_success_callback``, ``pre_execute`` and related family which are meant to provide callbacks
for particular dag authors, or operator creators. There is no possibility to listen on events generated
by particular dag.

To include listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin </plugins>`

|experimental|

Interface
---------

To create a listener you will need to derive the
create python module, import ``airflow.listeners.hookimpl`` and implement the ``hookimpls`` for
events you want to be notified at.

Right now Airflow exposes TaskInstance state change events.
Their specification is defined as ``hookspec`` in ``airflow/listeners/spec.py`` file.
3 changes: 3 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,8 @@ hoc
homebrew
honoured
hookable
hookimpl
hookspec
HostAliases
Hostname
hostname
Expand Down Expand Up @@ -1056,6 +1058,7 @@ pkill
plaintext
platformVersion
pluggable
pluggy
plyvel
png
podName
Expand Down
29 changes: 29 additions & 0 deletions tests/jobs/test_backfill_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import json
import logging
import threading
from unittest import mock
from unittest.mock import patch

import pytest
Expand All @@ -36,6 +37,7 @@
TaskConcurrencyLimitReached,
)
from airflow.jobs.backfill_job import BackfillJob
from airflow.listeners.listener import get_listener_manager
from airflow.models import DagBag, Pool, TaskInstance as TI
from airflow.models.dagrun import DagRun
from airflow.models.serialized_dag import SerializedDagModel
Expand All @@ -48,6 +50,7 @@
from airflow.utils.timeout import timeout
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.types import DagRunType
from tests.listeners import dag_listener
from tests.models import TEST_DAGS_FOLDER
from tests.test_utils.db import (
clear_db_dags,
Expand Down Expand Up @@ -974,6 +977,32 @@ def test_backfill_max_limit_check_within_limit(self, dag_maker):
assert 2 == len(dagruns)
assert all(run.state == State.SUCCESS for run in dagruns)

def test_backfill_notifies_dagrun_listener(self, dag_maker):
dag = self._get_dummy_dag(dag_maker)
dag_run = dag_maker.create_dagrun(state=None)
dag_listener.clear()
get_listener_manager().add_listener(dag_listener)

start_date = DEFAULT_DATE - datetime.timedelta(hours=1)
end_date = DEFAULT_DATE

executor = MockExecutor()
job = BackfillJob(
dag=dag, start_date=start_date, end_date=end_date, executor=executor, donot_pickle=True
)
job.notification_threadpool = mock.MagicMock()
job.run()

assert len(dag_listener.running) == 1
assert len(dag_listener.success) == 1
assert dag_listener.running[0].dag.dag_id == dag_run.dag.dag_id
assert dag_listener.running[0].run_id == dag_run.run_id
assert dag_listener.running[0].state == DagRunState.RUNNING

assert dag_listener.success[0].dag.dag_id == dag_run.dag.dag_id
assert dag_listener.success[0].run_id == dag_run.run_id
assert dag_listener.success[0].state == DagRunState.SUCCESS

def test_backfill_max_limit_check(self, dag_maker):
dag_id = "test_backfill_max_limit_check"
run_id = "test_dag_run"
Expand Down

0 comments on commit 035315f

Please sign in to comment.