Add a DagRun and TaskInstance event listener under the Kafka provider#68082
Add a DagRun and TaskInstance event listener under the Kafka provider#68082xBis7 wants to merge 1 commit into
Conversation
|
I am confused by the use case here. Shouldn't this be an Asset Watcher? |
@vikramkoka No but it could be used together with an Asset Watcher. An Asset Watcher is essentially on the consumer side of events while this PR is on the producer side. The new listener produces a message to a Kafka topic for every event state change. We can consume the messages from the topic either using the existing consumer hook from the Kafka provider or using an Asset Watcher. For example, team A with Airflow installation A triggers dag1 and has enabled the listener which publishes events for every state change to a Kafka topic
Team B with Airflow installation B, is monitoring the same kafka topic and has a deferred task that waits to run after it consumes the message |
This PR is adding a listener under the Kafka provider which provides an implementation for the DagRun and TaskInstance state change event hooks.
The listener publishes a message to a pre-existing Kafka topic for every event. Each message has some extra metadata for the dag or task that the event belongs to.
DagRun events and TaskInstance events are separated and guarded behind different config flags. Both flags are disabled by default because each listener adds load to Airflow that users might not want.
The listener expects that the user has already created the topic and then defines it in the listener's config section. In case the topic doesn't exist, the listener doesn't fail. Instead it logs a warning and checks for the topic existence again after a configured interval.
Users can also filter events for dag runs based on dag_id and for tasks based on dag_id + task_id. For example, someone could choose to get only dag run state events for a particular dag and nothing else.
By adding a listener to the provider we can have dags consume from the Kafka topic and react to messages. When a particular message arrives, a deferred task could be triggered. This could also work for dags across multiple Airflow installations, all sharing the common topic. For that reason, I also added a
sourceconfig key, so that users can distinguish where the messages came from.The listener follows the same format as the openlineage one
https://github.com/apache/airflow/blob/main/providers/openlineage/src/airflow/providers/openlineage/plugins/listener.py
Testing
I've added unit tests and also a simple integration test that uses an actual broker. I've also tested the changes manually.
Was generative AI tooling used to co-author this PR?
Claude Code, Opus 4.7
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.