Skip to content

InfluxDB3Operator: add deferrable variant #67107

@arpitrathore

Description

@arpitrathore

Description

The InfluxDB3Operator added in #58929 currently executes queries synchronously, which means each task holds an Airflow worker slot for the entire duration of the query. For long-running analytics or aggregation queries over InfluxDB 3 clusters, this prevents the worker from picking up other tasks while the query runs on the database.

This issue proposes adding a deferrable: bool = False parameter (and a separate trigger class) to InfluxDB3Operator, following the pattern already established in:

  • SnowflakeSqlApiOperator (deferrable variant with SnowflakeSqlApiTrigger)
  • BigQueryInsertJobOperator (deferrable mode with BigQueryInsertJobTrigger)
  • RedshiftDataOperator (deferrable with RedshiftDataTrigger)

Proposed API

from airflow.providers.influxdb.operators.influxdb3 import InfluxDB3Operator

InfluxDB3Operator(
    task_id="aggregate_metrics",
    sql="SELECT ... FROM measurement WHERE ...",
    influxdb3_conn_id="influxdb3_default",
    deferrable=True,
    poll_interval=30,
)

When deferrable=True, execute() should:

  1. Submit the query to InfluxDB 3 using InfluxDBClient3.query_async(), the asyncio coroutine exposed by influxdb3-python. (query_async was introduced in influxdb3-python 0.12.0; the influxdb provider currently declares influxdb3-python>=0.7.0, so the deferrable implementation will need to bump the lower bound to >=0.12.0.) For very large result sets, the synchronous query(mode="reader") Apache Arrow Flight stream is also available if a streaming result interface is preferred.
  2. Call self.defer(trigger=InfluxDB3QueryTrigger(...), method_name="execute_complete").
  3. The trigger polls the query state on the InfluxDB cluster at poll_interval and yields a TriggerEvent when the query finishes or fails.
  4. execute_complete() retrieves the result and pushes it to XCom in the same shape as the sync path.

Acceptance criteria

  • New deferrable: bool parameter on InfluxDB3Operator (default False for backward compatibility).
  • New InfluxDB3QueryTrigger (or similar) implementing the async polling loop using influxdb3-python's async client.
  • Unit tests covering both deferrable=True and deferrable=False code paths.
  • Integration test exercising the trigger end-to-end against a local InfluxDB 3 instance (or mocked equivalent if a real one is impractical in CI).
  • Provider documentation updated with a deferrable=True example.

Scope and tradeoffs

  1. InfluxDBClient3.query_async() is a loop.run_in_executor(None, ...) wrapper around the blocking Apache Arrow Flight calls (see influxdb3-python/influxdb_client_3/query/query_api.py). The interface is a proper coroutine and is suitable for use in a Trigger. Concurrency in the Triggerer is bounded by the executor's thread pool rather than by native async IO. Native async over Arrow Flight is a future improvement on the InfluxData client side, not a blocker for this issue. The provider's current influxdb3-python>=0.7.0 lower bound will need to be bumped to >=0.12.0, the release that introduced query_async.
  2. The deferrable variant frees a worker slot while the query runs, but the result still flows back through the operator and into XCom. This is most useful for long-running queries with small-to-moderate result sets (aggregations, freshness probes, dashboard queries). For very large extracts, the existing InfluxDB3Hook + Python task pattern remains the right answer, and this issue does not aim to replace it.

Use case/motivation

Analytics queries on InfluxDB Cloud Dedicated or InfluxDB 3 Enterprise routinely take minutes for windowed aggregations across high-cardinality measurements. In a constrained worker pool, holding a slot per long-running query limits the parallelism the rest of the DAG fleet can achieve. A deferrable variant releases the worker slot back to the pool the moment the query is submitted to InfluxDB. The Triggerer then polls the query state and resumes the task when the result is ready.

This matches established Airflow patterns and unblocks teams that want to standardise on the new InfluxDB3Operator without losing worker-slot efficiency.

Related issues

  • Original InfluxDB 3 support: Add support for influx3 in influxdb provider #58929
  • Snowflake deferrable reference: airflow/providers/snowflake/operators/snowflake.py, airflow/providers/snowflake/triggers/snowflake_trigger.py
  • BigQuery deferrable reference: airflow/providers/google/cloud/operators/bigquery.py, airflow/providers/google/cloud/triggers/bigquery.py

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions