Skip to content

InfluxDB 3: add a dedicated sensor (data freshness / windowed existence) #67109

@arpitrathore

Description

@arpitrathore

Description

The InfluxDB 3 support added in #58929 provides InfluxDB3Hook and InfluxDB3Operator but no dedicated sensor. Today, users that need to gate downstream tasks on the arrival of data in an InfluxDB 3 measurement have to wrap InfluxDB3Hook in a PythonSensor, which works but lacks the discoverability, deferrable semantics, and consistent API of a dedicated sensor.

This issue proposes adding a dedicated sensor (exact class name and API to be decided in review) to the influxdb provider, following the pattern established by:

  • SqlSensor (airflow.providers.common.sql.sensors.sql). Generic SQL-truthy polling.
  • AwaitMessageSensor (airflow.providers.apache.kafka.sensors.kafka). Deferrable Kafka message wait.
  • BigQueryTablePartitionExistenceSensor (airflow.providers.google.cloud.sensors.bigquery). Partition existence check.
  • HivePartitionSensor and NamedHivePartitionSensor (airflow.providers.apache.hive.sensors.hive_partition).

Proposed API

Two complementary designs. Final naming and signatures are open for discussion in review.

Option A (recommended first cut): generic SQL-truthy sensor, modelled on SqlSensor. Smallest delivery, broadest applicability.

from airflow.providers.influxdb.sensors.influxdb3 import InfluxDB3Sensor

wait_for_data = InfluxDB3Sensor(
    task_id="wait_for_data",
    influxdb3_conn_id="influxdb3_ro",
    sql="""
        SELECT 1 FROM "events"
        WHERE time >= '{{ data_interval_start }}'
          AND time <  '{{ data_interval_end }}'
        LIMIT 1
    """,
    deferrable=True,
    poke_interval=60,
    timeout=3600,
)

Option B (follow-up): ergonomic window-existence sensor, modelled on BigQueryTablePartitionExistenceSensor. Removes the boilerplate of hand-writing the SELECT ... LIMIT 1 query.

from airflow.providers.influxdb.sensors.influxdb3 import InfluxDB3MeasurementWindowSensor

wait_for_upstream = InfluxDB3MeasurementWindowSensor(
    task_id="wait_for_upstream",
    influxdb3_conn_id="influxdb3_ro",
    measurement="events",
    window_start="{{ data_interval_start }}",
    window_end="{{ data_interval_end }}",
    deferrable=True,
    poke_interval=60,
    timeout=3600,
)

Class names (InfluxDB3Sensor, InfluxDB3MeasurementWindowSensor, InfluxDB3PartitionExistenceSensor, etc.) and parameter shapes are placeholders open to maintainer preference.

Acceptance criteria

  • At least the generic SQL-truthy sensor (Option A) landed in airflow.providers.influxdb.sensors.influxdb3. The window-existence variant (Option B) can ship in the same PR or as a follow-up.
  • Deferrable variant from day one (deferrable=True parameter, trigger class implementing the async polling loop). The trigger should use InfluxDBClient3.query_async() (introduced in influxdb3-python 0.12.0) so polling does not hold worker slots. The provider's current influxdb3-python>=0.7.0 lower bound will need to be bumped to >=0.12.0.
  • Unit tests covering match-found, no-match-timeout, and the deferrable path.
  • Integration test against a local InfluxDB 3 instance (or equivalent).
  • Provider documentation with usage examples.

Scope and tradeoffs

InfluxDBClient3.query_async() is a loop.run_in_executor(None, ...) wrapper around the blocking Apache Arrow Flight calls (see influxdb3-python/influxdb_client_3/query/query_api.py). The interface is a proper coroutine and is suitable for use in a Trigger. Concurrency in the Triggerer is bounded by the executor's thread pool rather than by native async IO. Native async over Arrow Flight is a future improvement on the InfluxData client side, not a blocker for this issue. The provider's current influxdb3-python>=0.7.0 lower bound will need to be bumped to >=0.12.0, the release that introduced query_async.

Use case/motivation

Time-series pipelines built on InfluxDB 3 frequently have inter-DAG data dependencies. One DAG writes a daily aggregate to a measurement; a downstream DAG reads from that measurement to compute a higher-level aggregate. Today the only options are:

  1. Fixed-delay scheduling. Schedule the downstream DAG to run N hours after the upstream, hoping the upstream finishes in time. Brittle: if the upstream runs long, the downstream reads stale or missing data; if it finishes early, the downstream slot sits idle.
  2. Hand-rolled PythonSensor. Every team writes a small wrapper around InfluxDB3Hook to poll for data presence. Duplicated effort, inconsistent semantics, not deferrable by default.

A dedicated sensor fixes both.

Example: a downstream DAG aggregates data written to an InfluxDB measurement by one or more upstream DAGs. Today, the downstream DAG is scheduled with a fixed delay (often hours) after the upstream to give the writes time to land. If the upstream runs long, the downstream reads stale or missing data; if it finishes early, the downstream slot sits idle. A dedicated sensor that polls the upstream measurement would let the downstream DAG start as soon as the data is actually present, rather than relying on a fixed offset.

Related issues

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions