You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Opening this per @hubcio's suggestion in #2927 ("this issue needs to be opened as discussion where we dump ideas how to fix this correctly … please do it"). Four threads are circling the same problem from different sides:
They're one design. This post tries to assemble what each of you has already concluded into a single coherent shape, fill the gaps, and propose a delivery order. I've built and run the receipt-gated, quarantine-before-advance pattern on Iggy streams myself, so this is grounded in having lived with it — but it's deliberately framed around the conclusions already in these threads.
Who owns what (the part that keeps coming up)
Concern
Owner
From
Offset commit — gate on the i32, advance only on success
runtime (it owns IggyConsumer; store_offset is runtime-side, no callback)
The one thing to nail: the i32 is not cosmetic (you settled this, @hubcio — "commit after success already needs the runtime to read the return"), but it stays binary — 0 commits, non-zero doesn't, and the runtime never classifies/retries/routes on it. Classification is the sink's; routing is per-layer.
Three layers, delivered in order
Layer
Capability
Mechanism
ABI
Closes
1
At-least-once
AutoCommit::Disabled + binary i32 + commit to last contiguous batch success + circuit-breaker halt
Mapping to Kafka Connect (the model you reached for, @hubcio): Layer 1 ≈ preCommit; 2a ≈ converter/transform DLQ under errors.tolerance; 2b ≈ ErrantRecordReporter; the topic ≈ errors.deadletterqueue.topic.name.
Two subtleties worth stating so the implementation doesn't trip on them:
"Last contiguous success," not "withhold and continue." A live consumer advances an in-memory cursor on next() and dedups client-side when !allow_replay (consumer.rs:712-719), so withholding a commit doesn't replay to a running consumer — only a restart does. And committing a success that follows a failure advances the watermark past the gap and drops it. So: never step over a gap; halt at it; restart replays from the last contiguous commit.
Granularity. With a binary per-batch i32, the contiguous watermark is necessarily batch-granular. Message-granular partial commit needs the per-record channel from Layer 2b.
Caveat (yours, @hubcio): at-least-once leans on idempotent sinks; ES auto-id and Iceberg catalog-commit aren't — AtMostOnce stays the default.
Layer 2a — framework DLQ (the half the runtime sees, no ABI change)
decode/transform/serialize failures today error! + drop (sink.rs:584-707). The runtime has the record + the error, so it routes them to the DLQ topic itself — per-record, runtime-owned, no FFI. This is the most landable DLQ slice: it turns an existing silent drop into a durable, inspectable quarantine, and it exercises the envelope + the router that 2b reuses.
Layer 2b — sink-write DLQ (the poison case, one additive symbol)
The half the runtime can't see: a record the sink permanently fails to write. The sink declares per-record verdicts; the runtime routes them. The sink hands them over via an out-param disposition buffer on a new additive consume_v2 symbol (existing plugins that don't export it fall back to consume() and get Layer-1 replay only):
The composition with Layer 1's binary i32 is what makes this clean:
consume_v2: sink writes good records to the target, appends {offset, reason} for each PERMANENT
per-record failure to the buffer, then:
• returns 0 → every record classified (target OR buffer). Runtime produces the buffered
dead-letters in ONE batch, gates the commit on that produce, advances. Poison
quarantined; partition moves on. (#2927, solved.)
• returns !0 → couldn't process the batch at all (target down — transient). Layer 1 takes over.
Per-record permanent → buffer (2b); whole-batch transient → non-zero (Layer 1). The i32 stays
binary; no "richer return code" needed. The sink only DECLARES; the runtime owns produce + receipt-
gate + on_dlq_failure + commit — entirely runtime-side, never across the FFI.
On the verdict channel — the one real decision. The alternative to the out-param is an injected reporter callback, i.e. exactly @kriti-sc's DlqCallback and Kafka Connect's ErrantRecordReporter. Your instinct that the sink must drive per-record routing is right either way; this is only about the channel:
out-param (recommended)
reporter callback (alternative)
Mechanism ownership
sink only declares; runtime does produce/receipt/commit
sink calls produce via the callback + must handle its -1
Runtime machinery
none — synchronous out-pointer
needs a bare-fn-pointer registry/trampoline (the SINK_*_SENDERS shape you argued against for commit, @hubcio)
Batching
one DLQ produce per batch
one per report() call unless buffered
2a/2b unification
both feed one router
reporter is a push API the runtime can't call on itself for 2a
I lean out-param for the cleaner split + no registry + the 2a/2b router unification; the reporter's edge is familiarity and leaving consume() alone. It's isolated enough that flipping it later touches only the channel — not the envelope, router, or config. Happy to expand the full trade-off (buffer-sizing protocol, fallback) if useful.
The envelope (the shared primitive — @spetz's "source offset, headers, payload")
Used by both 2a and 2b. Payload byte-for-byte (redrive is a straight replay; metadata stays readable even when the body is poison/binary — no base64). Metadata in user_headers, namespaced iggy_dlq_*:
pub const keys + a closed DlqErrorClass enum live in iggy_connector_sdk (a lib), so the runtime, connectors, and any redrive tooling share one definition. One gotcha worth flagging now: HeaderValue rejects empty or >255-byte values, and a reason built from an error string routinely exceeds that — so reason must be clamped at the SDK boundary, or a fat error makes a message fail its own quarantine (and crash-loop under a halt policy).
DlqConfig mirrors the optional transforms section and StreamProducerConfig (required stream+topic):
[dlq]
stream = "dlq"topic = "http_sink"on_dlq_failure = "stop_connector"# or "drop_and_continue"auto_create_topic = true# net-new: no connector creates topics today
On the longer game (@numinnex / @spetz): the SDK primitive boundary is drawn so the storage backend can later swap from "a dedicated topic" to the embedded KV-store pointer-variant (store an offset, not a deep copy) without connectors changing. Topic now; KV-store-ready seam.
Delivery semantics (precisely)
at-most-once → at-least-once, with two honest caveats: a DLQ send⇒Ok is broker-acceptance, not fsync (optional flush(fsync=true) before commit for stronger durability); and duplicates are batch-granular (a crash between DLQ-write and commit re-delivers — dedupe on iggy_dlq_original_id + iggy_dlq_offset; non-idempotent sinks may duplicate an already-written prefix). The guarantee is "never silently lost," not "never duplicated."
on_dlq_failure default — stop_connector (correctness-first) or drop_and_continue?
Durability — broker-acceptance (default) or flush(fsync=true) before commit for the DLQ?
I can take Layer 1 as the first PR (it closes #2927/#2928, which I filed) — and draft the envelope + framework DLQ as the second. Does this synthesis match where you all landed, and is the out-param the right call for 2b?
reacted with thumbs up emoji reacted with thumbs down emoji reacted with laugh emoji reacted with hooray emoji reacted with confused emoji reacted with heart emoji reacted with rocket emoji reacted with eyes emoji
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
Opening this per @hubcio's suggestion in #2927 ("this issue needs to be opened as discussion where we dump ideas how to fix this correctly … please do it"). Four threads are circling the same problem from different sides:
consume()i32is discardedThey're one design. This post tries to assemble what each of you has already concluded into a single coherent shape, fill the gaps, and propose a delivery order. I've built and run the receipt-gated, quarantine-before-advance pattern on Iggy streams myself, so this is grounded in having lived with it — but it's deliberately framed around the conclusions already in these threads.
Who owns what (the part that keeps coming up)
i32, advance only on successIggyConsumer;store_offsetis runtime-side, no callback)The one thing to nail: the
i32is not cosmetic (you settled this, @hubcio — "commit after success already needs the runtime to read the return"), but it stays binary —0commits, non-zero doesn't, and the runtime never classifies/retries/routes on it. Classification is the sink's; routing is per-layer.Three layers, delivered in order
AutoCommit::Disabled+ binaryi32+ commit to last contiguous batch success + circuit-breaker haltMapping to Kafka Connect (the model you reached for, @hubcio): Layer 1 ≈
preCommit; 2a ≈ converter/transform DLQ undererrors.tolerance; 2b ≈ErrantRecordReporter; the topic ≈errors.deadletterqueue.topic.name.Layer 1 — at-least-once (no ABI change)
Your #3203 design, @hubcio, completed:
AtLeastOnce→AutoCommit::Disabled; the consume loop binds thei32(today unbound atsink.rs:740), and:0: advance a contiguous watermark andconsumer.store_offset(...)(monotonic —consumer.rs:438— so a re-commit after replay is a no-op);return Err(that's fix(connectors): check consume() return value in sink runtime #3061/fix(connectors): propagate sink consume FFI failures with batch context #3180 — kills the task) and do not step over the gap; re-run the in-hand batch (the sink's in-call retry handles transient blips); a consecutive-failureCircuitBreakerhalts/pauses the partition. That halt is exactly where Layer 2b later swaps in "dead-letter and advance."Two subtleties worth stating so the implementation doesn't trip on them:
next()and dedups client-side when!allow_replay(consumer.rs:712-719), so withholding a commit doesn't replay to a running consumer — only a restart does. And committing a success that follows a failure advances the watermark past the gap and drops it. So: never step over a gap; halt at it; restart replays from the last contiguous commit.i32, the contiguous watermark is necessarily batch-granular. Message-granular partial commit needs the per-record channel from Layer 2b.Caveat (yours, @hubcio): at-least-once leans on idempotent sinks; ES auto-id and Iceberg catalog-commit aren't —
AtMostOncestays the default.Layer 2a — framework DLQ (the half the runtime sees, no ABI change)
decode/transform/serialize failures today
error!+ drop (sink.rs:584-707). The runtime has the record + the error, so it routes them to the DLQ topic itself — per-record, runtime-owned, no FFI. This is the most landable DLQ slice: it turns an existing silent drop into a durable, inspectable quarantine, and it exercises the envelope + the router that 2b reuses.Layer 2b — sink-write DLQ (the poison case, one additive symbol)
The half the runtime can't see: a record the sink permanently fails to write. The sink declares per-record verdicts; the runtime routes them. The sink hands them over via an out-param disposition buffer on a new additive
consume_v2symbol (existing plugins that don't export it fall back toconsume()and get Layer-1 replay only):The composition with Layer 1's binary
i32is what makes this clean:On the verdict channel — the one real decision. The alternative to the out-param is an injected reporter callback, i.e. exactly @kriti-sc's
DlqCallbackand Kafka Connect'sErrantRecordReporter. Your instinct that the sink must drive per-record routing is right either way; this is only about the channel:-1SINK_*_SENDERSshape you argued against for commit, @hubcio)report()call unless bufferedconsumeentrypointErrantRecordReporter;consume()untouchedI lean out-param for the cleaner split + no registry + the 2a/2b router unification; the reporter's edge is familiarity and leaving
consume()alone. It's isolated enough that flipping it later touches only the channel — not the envelope, router, or config. Happy to expand the full trade-off (buffer-sizing protocol, fallback) if useful.The envelope (the shared primitive — @spetz's "source offset, headers, payload")
Used by both 2a and 2b. Payload byte-for-byte (redrive is a straight replay; metadata stays readable even when the body is poison/binary — no base64). Metadata in
user_headers, namespacediggy_dlq_*:pub constkeys + a closedDlqErrorClassenum live iniggy_connector_sdk(a lib), so the runtime, connectors, and any redrive tooling share one definition. One gotcha worth flagging now:HeaderValuerejects empty or >255-byte values, and areasonbuilt from an error string routinely exceeds that — soreasonmust be clamped at the SDK boundary, or a fat error makes a message fail its own quarantine (and crash-loop under a halt policy).DlqConfigmirrors the optionaltransformssection andStreamProducerConfig(requiredstream+topic):On the longer game (@numinnex / @spetz): the SDK primitive boundary is drawn so the storage backend can later swap from "a dedicated topic" to the embedded KV-store pointer-variant (store an offset, not a deep copy) without connectors changing. Topic now; KV-store-ready seam.
Delivery semantics (precisely)
at-most-once → at-least-once, with two honest caveats: a DLQ
send⇒Okis broker-acceptance, not fsync (optionalflush(fsync=true)before commit for stronger durability); and duplicates are batch-granular (a crash between DLQ-write and commit re-delivers — dedupe oniggy_dlq_original_id+iggy_dlq_offset; non-idempotent sinks may duplicate an already-written prefix). The guarantee is "never silently lost," not "never duplicated."Sequencing
This keeps at-least-once unblocked (your point, @hubcio) while the whole design stays coherent.
Open questions
ErrantRecordReporter-style reporter. The one decision needing a call.pub const DLQ_HDR_*+DlqErrorClassbelong iniggy_connector_sdkso redrive tooling can depend on them.dlqstream /<connector_key>topic, 1 partition)?on_dlq_failuredefault —stop_connector(correctness-first) ordrop_and_continue?flush(fsync=true)before commit for the DLQ?I can take Layer 1 as the first PR (it closes #2927/#2928, which I filed) — and draft the envelope + framework DLQ as the second. Does this synthesis match where you all landed, and is the out-param the right call for 2b?
sink-dlq-verdict-channel-alternatives.md
connector-delivery-guarantees-and-dlq.md
Beta Was this translation helpful? Give feedback.
All reactions