Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .config/nextest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@
filter = 'package(integration) and test(cli::system::test_cli_session_scenario::should_be_successful)'
threads-required = "num-cpus"

# Doris testcontainer must bind host:8040 1:1 (FE→BE 307 redirects to
# 127.0.0.1:8040; BE's priority_networks pins the advertise address) and the
# all-in-one image is heavy enough that running it alongside other
# container-backed tests (e.g. elasticsearch) starves their startup timers.
# threads-required = "num-cpus" forces each doris test to run alone, matching
# the existing pattern used elsewhere in this file. The longer slow-timeout
# accounts for cold container boot (~60-90s on Linux CI) plus the test body.
[[profile.default.overrides]]
filter = 'package(integration) and test(/connectors::doris::/)'
threads-required = "num-cpus"
slow-timeout = { period = "60s", terminate-after = 8 }

[profile.default]
slow-timeout = { period = "30s", terminate-after = 4 }

Expand Down
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ members = [
"core/connectors/runtime",
"core/connectors/sdk",
"core/connectors/sinks/delta_sink",
"core/connectors/sinks/doris_sink",
"core/connectors/sinks/elasticsearch_sink",
"core/connectors/sinks/http_sink",
"core/connectors/sinks/iceberg_sink",
Expand Down Expand Up @@ -268,6 +269,7 @@ socket2 = "0.6.3"
sqlx = { version = "0.8.6", features = [
"runtime-tokio-rustls",
"postgres",
"mysql",
"chrono",
"uuid",
"json",
Expand Down
1 change: 1 addition & 0 deletions core/connectors/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ Each sink should have its own, custom configuration, which is passed along with
### Available Sinks
- **Doris Sink** - loads JSON messages into Apache Doris tables via the Stream Load HTTP API
- **Elasticsearch Sink** - sends messages to Elasticsearch indices
- **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog
- **PostgreSQL Sink** - stores messages in PostgreSQL database tables
Expand Down
43 changes: 43 additions & 0 deletions core/connectors/runtime/example_config/connectors/doris_sink.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "sink"
key = "doris"
enabled = true
version = 0
name = "Doris sink"
path = "<BASE_DIR>/target/release/libiggy_connector_doris_sink"
plugin_config_format = "toml"
verbose = false

[[streams]]
stream = "events"
topics = ["doris_events"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
consumer_group = "doris_sink"

[plugin_config]
fe_url = "http://localhost:8030"
database = "iggy_demo"
table = "events"
username = "root"
password = "replace_with_secret"
label_prefix = "iggy"
batch_size = 1000
timeout_secs = 30
1 change: 1 addition & 0 deletions core/connectors/sinks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s

| Sink | Description |
| ---- | ----------- |
| **doris_sink** | Loads JSON messages into Apache Doris tables via the Stream Load HTTP API |
| **elasticsearch_sink** | Sends messages to Elasticsearch indices for full-text search and analytics |
| **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage |
| **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas |
Expand Down
45 changes: 45 additions & 0 deletions core/connectors/sinks/doris_sink/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "iggy_connector_doris_sink"
version = "0.1.0"
description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second."
edition = "2024"
license = "Apache-2.0"
keywords = ["iggy", "messaging", "streaming"]
categories = ["command-line-utilities", "database", "network-programming"]
homepage = "https://iggy.apache.org"
documentation = "https://iggy.apache.org/docs"
repository = "https://github.com/apache/iggy"
readme = "../../README.md"

[lib]
crate-type = ["cdylib", "lib"]

[dependencies]
async-trait = { workspace = true }
base64 = { workspace = true }
blake3 = { workspace = true }
bytes = { workspace = true }
iggy_connector_sdk = { workspace = true }
reqwest = { workspace = true }
secrecy = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
simd-json = { workspace = true }
tracing = { workspace = true }
89 changes: 89 additions & 0 deletions core/connectors/sinks/doris_sink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Apache Doris Sink

The Doris sink connector consumes JSON messages from Iggy streams and writes them to a pre-created Apache Doris table via Doris's [Stream Load HTTP API](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual).

## Requirements

- The target Doris **database and table must be pre-created** before enabling the sink. The connector never issues DDL.
- `database` and `table` config values must match `[A-Za-z0-9_]+`. Anything else is rejected at startup with `Error::InvalidConfigValue` — this also prevents path traversal in the constructed `/api/{db}/{table}/_stream_load` URL.
- Messages must arrive with `Payload::Json` (i.e. the configured stream schema is `json`). Other payload types are dropped with a `warn!` and the consumer offset advances past them — this is silent data loss for any non-JSON message, so the upstream schema must be guaranteed JSON.
- The Iggy message JSON shape must match the target table columns. Use the optional `columns` plugin setting if the column order differs from the JSON keys.

## How it works

1. For each batch of messages, the connector serializes the JSON payloads into a JSON array.
2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream}_{hash8}-{topic}_{hash8}-{partition}-{first_offset}-{last_offset}`.
- Each variable-length segment carries a 32-bit blake3 hash of the raw name, so names that sanitize to the same string (e.g. `events.v1` vs `events_v1`) cannot collide.
- The total label is bounded under Doris's 128-char cap regardless of input length.
- Doris dedupes loads by label inside its `label_keep_max_second` window, so a replayed batch (after restart, retry, etc.) is silently absorbed instead of producing duplicates.
3. It `PUT`s the batch to `{fe_url}/api/{database}/{table}/_stream_load` with HTTP Basic auth and the headers `Expect: 100-continue`, `format: json`, `strip_outer_array: true`, `label: <label>`. (`Expect: 100-continue` lets Doris reject auth/4xx failures before the connector uploads the whole body — important for large batches and required if a reverse proxy sits in front of Doris.)
4. The Doris frontend (FE) responds with a `307 Temporary Redirect` to a backend (BE). The connector follows the redirect manually so that the `Authorization` header is preserved across the hop (`reqwest`'s default policy strips it on cross-host redirects). `308 Permanent Redirect` is also followed as a defensive measure; redirects beyond a hard cap of 5 are rejected as `HttpRequestFailed`.
5. The HTTP body is parsed as JSON and the `Status` field decides the outcome:
- `Success` → batch accepted.
- `Label Already Exists` → idempotent replay, treated as success.
- `Publish Timeout` or HTTP `5xx`/`408`/`429` → transient error (`Error::CannotStoreData`); the runtime can retry.
- `Fail`, any other `4xx`, or an unparsable response body → permanent error (`Error::PermanentHttpError`); retrying is not useful.

## Configuration

| Field | Required | Default | Description |
| --- | --- | --- | --- |
| `fe_url` | yes | — | Doris frontend HTTP base URL, e.g. `http://localhost:8030`. |
| `database` | yes | — | Target database. Must match `[A-Za-z0-9_]+`. |
| `table` | yes | — | Target table. Must match `[A-Za-z0-9_]+`. |
| `username` | yes | — | Doris user with `LOAD_PRIV` on the table. |
| `password` | yes | — | Doris user password. Stored as a `secrecy::SecretString` and never logged. |
| `label_prefix` | no | `iggy` | Prefix for the deterministic Stream Load label. |
| `batch_size` | no | `1000` | Maximum number of messages per Stream Load request. |
| `timeout_secs` | no | `30` | Per-request HTTP timeout. |
| `max_filter_ratio` | no | unset | Forwarded as the `max_filter_ratio` Stream Load header. |
| `columns` | no | unset | Forwarded as the `columns` Stream Load header. |
| `where` | no | unset | Forwarded as the `where` Stream Load header. |

### Example

```toml
type = "sink"
key = "doris"
enabled = true
version = 0
name = "Doris sink"
path = "target/release/libiggy_connector_doris_sink"
plugin_config_format = "toml"

[[streams]]
stream = "events"
topics = ["doris_events"]
schema = "json"
batch_length = 100
poll_interval = "5ms"
consumer_group = "doris_sink"

[plugin_config]
fe_url = "http://localhost:8030"
database = "iggy_demo"
table = "events"
username = "root"
password = "replace_with_secret"
label_prefix = "iggy"
batch_size = 1000
timeout_secs = 30
```

## Security notes

- **Use `https://` in production.** The connector accepts `http://` URLs and logs a `warn!` when `fe_url` points at a non-loopback host over plain HTTP, but it does not refuse. Over `http://`, the HTTP Basic credentials travel in cleartext.
- **Trust boundary on the FE.** The connector intentionally preserves the `Authorization` header across the FE → BE 307 redirect (reqwest would otherwise strip it on cross-host redirects). A compromised or MITM'd FE could exfiltrate credentials by responding with `Location: http://attacker/`. The `http://` warning above is the primary mitigation; deploy Doris over TLS in hostile networks.
- **`columns` and `where` are SQL-expression pass-throughs.** Whatever you put in those config fields is forwarded verbatim to Doris's Stream Load and evaluated as a SQL expression. Keep this config trusted.

## Operational guidance

- **`label_keep_max_second`.** Idempotent replay relies on Doris retaining each label for at least as long as it could take the Iggy runtime to redrive a failed batch. The Doris default is 3 days, which is conservative. If you set this lower on the Doris side, make sure your runtime retry budget fits inside the window — once a label expires, a replay re-loads instead of deduping, producing duplicate rows.
- **Filtered-row alerts.** When Doris reports `number_filtered_rows > 0`, the connector emits a `warn!`. This is your signal that upstream message shapes have drifted from the table schema; alert on it.

## Limitations (todo)

- JSON payload only. CSV and raw-text payloads are not supported yet.
- HTTP Basic auth only.
- No automatic table creation.
- No built-in retry middleware or circuit breaker — the runtime decides whether to redrive a failing batch. A hardening pass with `iggy_connector_sdk::retry::*` is planned as a follow-up.
43 changes: 43 additions & 0 deletions core/connectors/sinks/doris_sink/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

type = "sink"
key = "doris"
enabled = true
version = 0
name = "Doris sink"
path = "../../target/release/libiggy_connector_doris_sink"
plugin_config_format = "toml"
verbose = false

[[streams]]
stream = ""
topics = []
schema = "json"
batch_length = 100
poll_interval = "5ms"
consumer_group = "doris_sink"

[plugin_config]
fe_url = "http://localhost:8030"
database = ""
table = ""
username = "root"
password = ""
label_prefix = "iggy"
batch_size = 1000
timeout_secs = 30
Loading
Loading