diff --git a/.config/nextest.toml b/.config/nextest.toml index fb488b6948..47f6b5723d 100644 --- a/.config/nextest.toml +++ b/.config/nextest.toml @@ -21,6 +21,18 @@ filter = 'package(integration) and test(cli::system::test_cli_session_scenario::should_be_successful)' threads-required = "num-cpus" +# Doris testcontainer must bind host:8040 1:1 (FE→BE 307 redirects to +# 127.0.0.1:8040; BE's priority_networks pins the advertise address) and the +# all-in-one image is heavy enough that running it alongside other +# container-backed tests (e.g. elasticsearch) starves their startup timers. +# threads-required = "num-cpus" forces each doris test to run alone, matching +# the existing pattern used elsewhere in this file. The longer slow-timeout +# accounts for cold container boot (~60-90s on Linux CI) plus the test body. +[[profile.default.overrides]] +filter = 'package(integration) and test(/connectors::doris::/)' +threads-required = "num-cpus" +slow-timeout = { period = "60s", terminate-after = 8 } + [profile.default] slow-timeout = { period = "30s", terminate-after = 4 } diff --git a/Cargo.lock b/Cargo.lock index 3a48443372..3ee05b419d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6563,6 +6563,23 @@ dependencies = [ "url", ] +[[package]] +name = "iggy_connector_doris_sink" +version = "0.1.0" +dependencies = [ + "async-trait", + "base64", + "blake3", + "bytes", + "iggy_connector_sdk", + "reqwest 0.13.3", + "secrecy", + "serde", + "serde_json", + "simd-json", + "tracing", +] + [[package]] name = "iggy_connector_elasticsearch_sink" version = "0.4.0" @@ -7031,6 +7048,7 @@ dependencies = [ "iggy-cli", "iggy_binary_protocol", "iggy_common", + "iggy_connector_doris_sink", "iggy_connector_sdk", "jsonwebtoken", "keyring", diff --git a/Cargo.toml b/Cargo.toml index 949a498f2a..fe2413af0a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ members = [ "core/connectors/runtime", "core/connectors/sdk", "core/connectors/sinks/delta_sink", + "core/connectors/sinks/doris_sink", "core/connectors/sinks/elasticsearch_sink", "core/connectors/sinks/http_sink", "core/connectors/sinks/iceberg_sink", @@ -268,6 +269,7 @@ socket2 = "0.6.3" sqlx = { version = "0.8.6", features = [ "runtime-tokio-rustls", "postgres", + "mysql", "chrono", "uuid", "json", diff --git a/core/connectors/README.md b/core/connectors/README.md index cdef67789a..c36624b1a8 100644 --- a/core/connectors/README.md +++ b/core/connectors/README.md @@ -78,6 +78,7 @@ Each sink should have its own, custom configuration, which is passed along with ### Available Sinks +- **Doris Sink** - loads JSON messages into Apache Doris tables via the Stream Load HTTP API - **Elasticsearch Sink** - sends messages to Elasticsearch indices - **Iceberg Sink** - writes data to Apache Iceberg tables via REST catalog - **PostgreSQL Sink** - stores messages in PostgreSQL database tables diff --git a/core/connectors/runtime/example_config/connectors/doris_sink.toml b/core/connectors/runtime/example_config/connectors/doris_sink.toml new file mode 100644 index 0000000000..bbc167397d --- /dev/null +++ b/core/connectors/runtime/example_config/connectors/doris_sink.toml @@ -0,0 +1,43 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +type = "sink" +key = "doris" +enabled = true +version = 0 +name = "Doris sink" +path = "/target/release/libiggy_connector_doris_sink" +plugin_config_format = "toml" +verbose = false + +[[streams]] +stream = "events" +topics = ["doris_events"] +schema = "json" +batch_length = 100 +poll_interval = "5ms" +consumer_group = "doris_sink" + +[plugin_config] +fe_url = "http://localhost:8030" +database = "iggy_demo" +table = "events" +username = "root" +password = "replace_with_secret" +label_prefix = "iggy" +batch_size = 1000 +timeout_secs = 30 diff --git a/core/connectors/sinks/README.md b/core/connectors/sinks/README.md index 55c639c336..fc0c83e1db 100644 --- a/core/connectors/sinks/README.md +++ b/core/connectors/sinks/README.md @@ -8,6 +8,7 @@ Sink connectors are responsible for writing data from Iggy streams to external s | Sink | Description | | ---- | ----------- | +| **doris_sink** | Loads JSON messages into Apache Doris tables via the Stream Load HTTP API | | **elasticsearch_sink** | Sends messages to Elasticsearch indices for full-text search and analytics | | **iceberg_sink** | Writes data to Apache Iceberg tables via REST catalog with S3/GCS/Azure storage | | **postgres_sink** | Stores messages in PostgreSQL database tables with configurable schemas | diff --git a/core/connectors/sinks/doris_sink/Cargo.toml b/core/connectors/sinks/doris_sink/Cargo.toml new file mode 100644 index 0000000000..385cbb50c3 --- /dev/null +++ b/core/connectors/sinks/doris_sink/Cargo.toml @@ -0,0 +1,45 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iggy_connector_doris_sink" +version = "0.1.0" +description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." +edition = "2024" +license = "Apache-2.0" +keywords = ["iggy", "messaging", "streaming"] +categories = ["command-line-utilities", "database", "network-programming"] +homepage = "https://iggy.apache.org" +documentation = "https://iggy.apache.org/docs" +repository = "https://github.com/apache/iggy" +readme = "../../README.md" + +[lib] +crate-type = ["cdylib", "lib"] + +[dependencies] +async-trait = { workspace = true } +base64 = { workspace = true } +blake3 = { workspace = true } +bytes = { workspace = true } +iggy_connector_sdk = { workspace = true } +reqwest = { workspace = true } +secrecy = { workspace = true } +serde = { workspace = true } +serde_json = { workspace = true } +simd-json = { workspace = true } +tracing = { workspace = true } diff --git a/core/connectors/sinks/doris_sink/README.md b/core/connectors/sinks/doris_sink/README.md new file mode 100644 index 0000000000..89864bc688 --- /dev/null +++ b/core/connectors/sinks/doris_sink/README.md @@ -0,0 +1,89 @@ +# Apache Doris Sink + +The Doris sink connector consumes JSON messages from Iggy streams and writes them to a pre-created Apache Doris table via Doris's [Stream Load HTTP API](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual). + +## Requirements + +- The target Doris **database and table must be pre-created** before enabling the sink. The connector never issues DDL. +- `database` and `table` config values must match `[A-Za-z0-9_]+`. Anything else is rejected at startup with `Error::InvalidConfigValue` — this also prevents path traversal in the constructed `/api/{db}/{table}/_stream_load` URL. +- Messages must arrive with `Payload::Json` (i.e. the configured stream schema is `json`). Other payload types are dropped with a `warn!` and the consumer offset advances past them — this is silent data loss for any non-JSON message, so the upstream schema must be guaranteed JSON. +- The Iggy message JSON shape must match the target table columns. Use the optional `columns` plugin setting if the column order differs from the JSON keys. + +## How it works + +1. For each batch of messages, the connector serializes the JSON payloads into a JSON array. +2. It computes a deterministic Stream Load `label` of the form `{label_prefix}-{stream}_{hash8}-{topic}_{hash8}-{partition}-{first_offset}-{last_offset}`. + - Each variable-length segment carries a 32-bit blake3 hash of the raw name, so names that sanitize to the same string (e.g. `events.v1` vs `events_v1`) cannot collide. + - The total label is bounded under Doris's 128-char cap regardless of input length. + - Doris dedupes loads by label inside its `label_keep_max_second` window, so a replayed batch (after restart, retry, etc.) is silently absorbed instead of producing duplicates. +3. It `PUT`s the batch to `{fe_url}/api/{database}/{table}/_stream_load` with HTTP Basic auth and the headers `Expect: 100-continue`, `format: json`, `strip_outer_array: true`, `label: