Skip to content

Extend InfluxDB Source/Sink connector to V3 #3062

@ryerraguntla

Description

@ryerraguntla

Description

Extend the current Influx DB connector (#2700) to Influx DB version 3 or InfluxDb3. The implementation should

  • Be Backward compatible with existing versions
  • Client interfaces should be version agnostic
  • Implement InfluxDB V3 features for the Source and Sink connectors
  • Cover the test cases both in unit cases and the integration fixtures

-** Acceptance Criteria

  1. Existing v2 configs work without modification (version defaults to v2)
  2. Setting version = "v3" routes writes to /api/v3/write_lp with Bearer auth
  3. Setting version = "v3" routes source queries to /api/v3/query_sql with JSONL parsing
  4. All shared code (Line Protocol builder, resilience engine, payload codec) has zero duplication
  5. accept_partial = true sends the query param and handles 207 responses on v3
  6. Precision values are transparently mapped (usmicrosecond for v3)
  7. Unit test coverage for both v2 and v3 transport implementations
  8. Integration test fixtures for v3 write and query paths

Component

Connectors

Compare Influx V2 and Influx v3

Dimension InfluxDB v2 InfluxDB v3 Impact
Write Endpoint POST /api/v2/write?org=O&bucket=B&precision=P POST /api/v3/write_lp?db=D&precision=P Different paths, different params (org+bucket vs db)
Write Auth Header Authorization: Token <TOKEN> Authorization: Bearer <TOKEN> Header prefix differs
Write Precision Values ns, us, ms, s auto, nanosecond, microsecond, millisecond, second Enum mapping needed
Write Body Line Protocol Line Protocol Same — heavy lifting shared
Partial Writes Not supported accept_partial=true207 Multi-Status v3-only feature, abstracted
Query Endpoint POST /api/v2/query?org=O (Flux) POST /api/v3/query_sql?db=D (SQL) / POST /api/v3/query_influxql?db=D Different query languages
Query Body JSON {"query": "<Flux>"} Query string params q=<SQL>, format=json|csv|jsonl|parquet Different request construction
Query Response CSV (Flux annotated) JSON / JSONL / CSV / Parquet Different parsers needed
Health Check GET /health GET /health Same
Storage Concept org + bucket database (no org) Config abstraction needed
Auth Model In-DB tokens (/api/v2/authorizations) External (Bearer tokens, IAM) Both use header-based tokens
Compat Endpoints v3 also supports /api/v2/write (compat mode) Fallback possible

Proposed solution

TODO:
A layered approach is possible with backward compatible is feasible. Will keep updating,

Layer 1: Unified Connector Interface (version-agnostic)
Layer 2: Version-Specific HTTP Transport
Layer 3: Shared Heavy-Lifting Engine (100% common code)
Layer 4: External

Layer 1: Unified Connector Interface (version-agnostic)

InfluxConfig (common)

Field Type Description
url String http://localhost:8086
token SecretString Works for both v2 Token & v3 Bearer
version InfluxVersion V2 | V3 ◄── ROUTING KEY
org Option<String> Required for v2, ignored for v3
bucket Option<String> v2 bucket name
database Option<String> v3 database name
precision String "us" → mapped per version
measurement String Line protocol measurement

InfluxClient trait (common interface)

Method Return Type
fn write_line_protocol(&self, body) Result<()>
fn query(&self, query_str) Result<QueryResult>
fn health_check(&self) Result<()>
fn build_write_url(&self) Url
fn build_query_url(&self) Url
fn auth_header(&self) (String, String)
fn map_precision(&self, p: &str) String

Layer 2: Version-Specific HTTP Transport

InfluxV2Client (impl InfluxClient)

Aspect Detail
write_url /api/v2/write?org=O&bucket=B&precision=us
query_url /api/v2/query?org=O — Body: Flux JSON, Response: CSV
auth_header "Token <TOKEN>"
precision_map us → "us", ns → "ns", ms → "ms", s → "s"

InfluxV3Client (impl InfluxClient)

Aspect Detail
write_url /api/v3/write_lp?db=D&precision=microsecond&accept_partial=true
query_url /api/v3/query_sql?db=D — Params: q=<SQL>, Response: JSON/JSONL
auth_header "Bearer <TOKEN>"
precision_map us → "microsecond", ns → "nanosecond", ms → "millisecond", s → "second"

Layer 3: Shared Heavy-Lifting Engine (100% common code)

Line Protocol Builder

write_measurement, write_tag, write_field, append_line, timestamp conversion

  • Measurement escaping (\ , \,, \n, \r)
  • Tag key/value escaping (\ , \,, \=, \n, \r)
  • Field string escaping (\ , \", \n, \r)
  • Payload serialization (JSON / Text / Base64)
  • Metadata tags (stream, topic, partition, offset)
  • Timestamp precision conversion (µs → ns/ms/s)
  • Batch assembly (newline-separated lines)

Query Result Parser

  • CSV parser (v2 Flux response)
  • JSON/JSONL parser (v3 SQL response)
  • Common QueryRow struct output
  • Cursor tracking & dedup logic

Resilience Engine

  • Circuit breaker (threshold + cool-down)
  • Retry middleware (exp backoff + jitter)
  • Connectivity probe (health endpoint)
  • Transient vs permanent error classification

Payload Codec

  • JSON ↔ bytes (simd_json fast path)
  • Text ↔ bytes (UTF-8 validated)
  • Base64 ↔ bytes (raw binary)

Layer 4: External

InfluxDB v2 Server (TSM engine)

Endpoint Auth Addressing
/api/v2/write Token auth org + bucket
/api/v2/query Token auth org + bucket
/health

InfluxDB v3 Server (IOx/Parquet engine)

Endpoint Auth Addressing
/api/v3/write_lp Bearer auth database
/api/v3/query_sql Bearer auth database
/api/v3/query_influxql Bearer auth database
/health

Alternatives considered

  • Separate crate per version: Rejected — High % (> 75 % ) code duplication and will be maintenance burden

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions