Skip to content

Commit

Permalink
Add Listener Plugin API that tracks TaskInstance state changes (#20443)
Browse files Browse the repository at this point in the history
This adds new Plugin API - "listeners". It enables plugin authors to write
[pluggy hook implementation][1] that will be called on certain formalized extension
points. To differentiate between current Airflow extension points, like
plugins, and current Airflow hooks, implementations of those hooks are called
listeners.

The API is ment to be called across all dags, and all operators - in contrast
to current on_success_callback, pre_execute and related family which are meant
to provide callbacks for particular dag authors, or operator creators.

pluggy mechanism enables us to execute multiple, or none, listeners that
implement particular extension point, so that users can use multiple listeners
seamlessly.

In this PR, three such extension points are added. When TaskInstance's state is
changed to RUNNING, on_task_instance_running hook is called. On change
toSUCCESS on_task_instance_success is called, similarly on FAILED
on_task_instance_failed is called.

Actual notification mechanism is be implemented using [SQLAlchemy’s events
mechanism][2]. This ensures that plugins will get every change of state,
regardless of where in the codebase it happened, and not require manual
annotation of TI state changes across the codebase.

To make sure that this change is not affecting performance, running this
mechanism on scheduler is disabled by default. The SQLAlchemy event mechanism
is also not affected by default - the event listener is only added if we have
any plugin which actually provides any listener.

[1]: https://pluggy.readthedocs.io/en/stable/
[2]: https://docs.sqlalchemy.org/en/13/orm/session_events.html#after-flush

Signed-off-by: Maciej Obuchowski <obuchowski.maciej@gmail.com>
  • Loading branch information
mobuchowski authored Jan 13, 2022
1 parent ce06e6b commit dba00ce
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 0 deletions.
12 changes: 12 additions & 0 deletions airflow/jobs/local_task_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow.jobs.base_job import BaseJob
from airflow.listeners.events import register_task_instance_state_events
from airflow.listeners.listener import get_listener_manager
from airflow.models.dagrun import DagRun
from airflow.models.taskinstance import TaskInstance
from airflow.sentry import Sentry
Expand Down Expand Up @@ -74,6 +76,7 @@ def __init__(
super().__init__(*args, **kwargs)

def _execute(self):
self._enable_task_listeners()
self.task_runner = get_task_runner(self)

def signal_handler(signum, frame):
Expand Down Expand Up @@ -291,3 +294,12 @@ def _update_dagrun_state_for_paused_dag(self, session=None):
if dag_run:
dag_run.dag = dag
dag_run.update_state(session=session, execute_callbacks=True)

@staticmethod
def _enable_task_listeners():
"""
Check if we have any registered listeners, then register sqlalchemy hooks for
TI state change if we do.
"""
if get_listener_manager().has_listeners:
register_task_instance_state_events()
20 changes: 20 additions & 0 deletions airflow/listeners/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from pluggy import HookimplMarker

hookimpl = HookimplMarker("airflow")
79 changes: 79 additions & 0 deletions airflow/listeners/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging

from sqlalchemy import event
from sqlalchemy.orm import Session

from airflow.listeners.listener import get_listener_manager
from airflow.models import TaskInstance
from airflow.utils.state import State

_is_listening = False


def on_task_instance_state_session_flush(session, flush_context):
"""
Listens for session.flush() events that modify TaskInstance's state, and notify listeners that listen
for that event. Doing it this way enable us to be stateless in the SQLAlchemy event listener.
"""
logger = logging.getLogger(__name__)
if not get_listener_manager().has_listeners:
return
for state in flush_context.states:
if isinstance(state.object, TaskInstance) and session.is_modified(
state.object, include_collections=False
):
added, unchanged, deleted = flush_context.get_attribute_history(state, 'state')

logger.debug(
"session flush listener: added %s unchanged %s deleted %s - %s",
added,
unchanged,
deleted,
state.object,
)
if not added:
continue

previous_state = deleted[0] if deleted else State.NONE

if State.RUNNING in added:
get_listener_manager().hook.on_task_instance_running(
previous_state=previous_state, task_instance=state.object, session=session
)
elif State.FAILED in added:
get_listener_manager().hook.on_task_instance_failed(
previous_state=previous_state, task_instance=state.object, session=session
)
elif State.SUCCESS in added:
get_listener_manager().hook.on_task_instance_success(
previous_state=previous_state, task_instance=state.object, session=session
)


def register_task_instance_state_events():
global _is_listening
if not _is_listening:
event.listen(Session, 'after_flush', on_task_instance_state_session_flush)
_is_listening = True


def unregister_task_instance_state_events():
global _is_listening
event.remove(Session, 'after_flush', on_task_instance_state_session_flush)
_is_listening = False
71 changes: 71 additions & 0 deletions airflow/listeners/listener.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
from types import ModuleType
from typing import TYPE_CHECKING

import pluggy

from airflow.plugins_manager import integrate_listener_plugins

if TYPE_CHECKING:
from pluggy._hooks import _HookRelay

log = logging.getLogger(__name__)


_listener_manager = None


class ListenerManager:
"""Class that manages registration of listeners and provides hook property for calling them"""

def __init__(self):
from airflow.listeners import spec

self.pm = pluggy.PluginManager("airflow")
self.pm.add_hookspecs(spec)

@property
def has_listeners(self) -> bool:
return len(self.pm.get_plugins()) > 0

@property
def hook(self) -> "_HookRelay":
"""Returns hook, on which plugin methods specified in spec can be called."""
return self.pm.hook

def add_listener(self, listener):
if not isinstance(listener, ModuleType):
raise TypeError("Listener %s is not module", str(listener))
if self.pm.is_registered(listener):
return
self.pm.register(listener)

def clear(self):
"""Remove registered plugins"""
for plugin in self.pm.get_plugins():
self.pm.unregister(plugin)


def get_listener_manager() -> ListenerManager:
global _listener_manager
if not _listener_manager:
_listener_manager = ListenerManager()
integrate_listener_plugins(_listener_manager)
return _listener_manager
49 changes: 49 additions & 0 deletions airflow/listeners/spec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from typing import TYPE_CHECKING, Optional

from pluggy import HookspecMarker

if TYPE_CHECKING:
from sqlalchemy.orm.session import Session

from airflow.models.taskinstance import TaskInstance
from airflow.utils.state import TaskInstanceState

hookspec = HookspecMarker("airflow")


@hookspec
def on_task_instance_running(
previous_state: "TaskInstanceState", task_instance: "TaskInstance", session: Optional["Session"]
):
"""Called when task state changes to RUNNING. Previous_state can be State.NONE."""


@hookspec
def on_task_instance_success(
previous_state: "TaskInstanceState", task_instance: "TaskInstance", session: Optional["Session"]
):
"""Called when task state changes to SUCCESS. Previous_state can be State.NONE."""


@hookspec
def on_task_instance_failed(
previous_state: "TaskInstanceState", task_instance: "TaskInstance", session: Optional["Session"]
):
"""Called when task state changes to FAIL. Previous_state can be State.NONE."""
23 changes: 23 additions & 0 deletions airflow/plugins_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@
except ImportError:
from importlib import metadata as importlib_metadata # type: ignore[no-redef]

from types import ModuleType

from airflow import settings
from airflow.utils.entry_points import entry_points_with_dist
from airflow.utils.file import find_path_from_directory
from airflow.utils.module_loading import as_importable_string

if TYPE_CHECKING:
from airflow.hooks.base import BaseHook
from airflow.listeners.listener import ListenerManager
from airflow.timetables.base import Timetable

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,6 +80,7 @@
"operator_extra_links",
"timetables",
"source",
"listeners",
}


Expand Down Expand Up @@ -153,6 +157,8 @@ class AirflowPlugin:
# A list of timetable classes that can be used for DAG scheduling.
timetables: List[Type["Timetable"]] = []

listeners: List[ModuleType] = []

@classmethod
def validate(cls):
"""Validates that plugin has a name."""
Expand Down Expand Up @@ -458,6 +464,20 @@ def integrate_macros_plugins() -> None:
setattr(macros, plugin.name, macros_module)


def integrate_listener_plugins(listener_manager: "ListenerManager") -> None:
global plugins

ensure_plugins_loaded()

if plugins:
for plugin in plugins:
if plugin.name is None:
raise AirflowPluginException("Invalid plugin name")

for listener in plugin.listeners:
listener_manager.add_listener(listener)


def get_plugin_info(attrs_to_dump: Optional[Iterable[str]] = None) -> List[Dict[str, Any]]:
"""
Dump plugins attributes
Expand All @@ -483,6 +503,9 @@ def get_plugin_info(attrs_to_dump: Optional[Iterable[str]] = None) -> List[Dict[
]
elif attr in ('macros', 'timetables', 'hooks', 'executors'):
info[attr] = [as_importable_string(d) for d in getattr(plugin, attr)]
elif attr == 'listeners':
# listeners are always modules
info[attr] = [d.__name__ for d in getattr(plugin, attr)]
elif attr == 'appbuilder_views':
info[attr] = [
{**d, 'view': as_importable_string(d['view'].__class__) if 'view' in d else None}
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ unit of work and continuity.
integration
kubernetes
lineage
listeners
dag-serialization
modules_management
Release Policies <release-process>
Expand Down
1 change: 1 addition & 0 deletions docs/apache-airflow/integration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ Airflow has a mechanism that allows you to expand its functionality and integrat
* :doc:`Metrics (statsd) </logging-monitoring/metrics>`
* :doc:`Operators and hooks </operators-and-hooks-ref>`
* :doc:`Plugins </plugins>`
* :doc:`Listeners </listeners>`
* :doc:`Secrets backends </security/secrets/secrets-backend/index>`
* :doc:`Tracking systems </logging-monitoring/tracking-user-activity>`
* :doc:`Web UI Authentication backends </security/api>`
Expand Down
41 changes: 41 additions & 0 deletions docs/apache-airflow/listeners.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
.. http://www.apache.org/licenses/LICENSE-2.0
.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
Listeners
=========

Airflow gives you an option to be notified of events happening in Airflow
by writing listeners. Listeners are powered by `pluggy <https://pluggy.readthedocs.io/en/stable/>`__

Listener API is meant to be called across all dags, and all operators - in contrast to methods like
``on_success_callback``, ``pre_execute`` and related family which are meant to provide callbacks
for particular dag authors, or operator creators. There is no possibility to listen on events generated
by particular dag.

To include listener in your Airflow installation, include it as a part of an :doc:`Airflow Plugin </plugins>`

|experimental|

Interface
---------

To create a listener you will need to derive the
create python module, import ``airflow.listeners.hookimpl`` and implement the ``hookimpls`` for
events you want to be notified at.

Right now Airflow exposes TaskInstance state change events.
Their specification is defined as ``hookspec`` in ``airflow/listeners/spec.py`` file.
5 changes: 5 additions & 0 deletions docs/apache-airflow/plugins.rst
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,11 @@ looks like:
# A list of timetable classes to register so they can be used in DAGs.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
You can derive it by inheritance (please refer to the example below). In the example, all options have been
defined as class attributes, but you can also define them as properties if you need to perform
additional initialization. Please note ``name`` inside this class must be specified.
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ install_requires =
packaging>=14.0
pendulum~=2.0
pep562~=1.0;python_version<"3.7"
pluggy~=1.0
psutil>=4.2.0, <6.0.0
pygments>=2.0.1, <3.0
pyjwt<3
Expand Down
Loading

0 comments on commit dba00ce

Please sign in to comment.