Skip to content

feat(message_bus): add QUIC, TCP-TLS, WS, WSS transports for SDK clients#3192

Merged
numinnex merged 27 commits into
masterfrom
feat/message-bus-multi-transport
May 4, 2026
Merged

feat(message_bus): add QUIC, TCP-TLS, WS, WSS transports for SDK clients#3192
numinnex merged 27 commits into
masterfrom
feat/message-bus-multi-transport

Conversation

@hubcio
Copy link
Copy Markdown
Contributor

@hubcio hubcio commented Apr 28, 2026

Replica plane stays TCP forever: VSR FIFO + view-change timing,
fd-delegation, writev batching all rely on plaintext between trusted
replicas. SDK-client plane gains four transports alongside TCP:

  • QUIC: shard-0 terminal (compio-quic CID demux), 1 bidi stream per
    peer, 0-RTT off + listener defense-in-depth reject.
  • TCP-TLS: rustls 1.3, no client auth, 0-RTT off, compio-tls behind
    unified TransportConn::run with bounded close_grace shutdown.
  • WS: compio-ws over plaintext TCP; pre-upgrade fd cross-shard
    handover keeps fd-delegation on plain TCP only.
  • WSS: WebSocketStream over TlsStream; both handshakes run on the
    per-connection install task.

Shared: TransportListener / TransportConn trait family; WebSocketConfig

  • close_grace threaded through MessageBusConfig and applied uniformly
    across TCP-TLS, WS, WSS; bounded safe-shutdown (no select! over
    stream.shutdown); single-task pump per WS/WSS using compio-ws
    cancel-safe read. Bus auth thin: both planes connect unauthenticated;
    server-ng gates via LOGIN_USER / LOGIN_WITH_PAT and future
    LOGIN_REPLICA. Ping announces replica_id only; no subprotocol, no
    ALPN, no MAC. Per-connection metadata flows via
    IggyMessageBus::client_meta; ShardFramePayload setup variants carry
    ClientConnMeta end to end.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 28, 2026

Codecov Report

❌ Patch coverage is 88.76207% with 384 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.44%. Comparing base (f02a43a) to head (2542853).
⚠️ Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
core/message_bus/src/replica/io.rs 82.91% 41 Missing and 7 partials ⚠️
core/message_bus/src/transports/wss.rs 89.26% 33 Missing and 5 partials ⚠️
core/configs/src/server_ng_config/websocket.rs 0.00% 34 Missing ⚠️
core/message_bus/src/transports/quic.rs 92.01% 22 Missing and 5 partials ⚠️
core/message_bus/src/transports/ws.rs 90.74% 22 Missing and 3 partials ⚠️
core/configs/src/server_ng_config/displays.rs 0.00% 24 Missing ⚠️
core/message_bus/src/installer/replica.rs 85.44% 19 Missing and 4 partials ⚠️
core/message_bus/src/transports/tcp_tls.rs 94.89% 15 Missing and 7 partials ⚠️
core/message_bus/src/installer/mod.rs 52.63% 18 Missing ⚠️
core/message_bus/src/config.rs 81.69% 7 Missing and 6 partials ⚠️
... and 20 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master    #3192      +/-   ##
============================================
+ Coverage     74.10%   74.44%   +0.33%     
  Complexity      943      943              
============================================
  Files          1164     1183      +19     
  Lines        103048   105866    +2818     
  Branches      80083    82916    +2833     
============================================
+ Hits          76364    78812    +2448     
- Misses        23996    24284     +288     
- Partials       2688     2770      +82     
Components Coverage Δ
Rust Core 75.73% <88.76%> (+0.40%) ⬆️
Java SDK 60.14% <ø> (ø)
C# SDK 69.07% <ø> (-0.31%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.53% <ø> (+0.13%) ⬆️
Go SDK 39.60% <ø> (ø)
Files with missing lines Coverage Δ
core/binary_protocol/src/consensus/header.rs 80.86% <100.00%> (+0.29%) ⬆️
core/binary_protocol/src/consensus/iobuf.rs 35.20% <100.00%> (+2.37%) ⬆️
core/configs/src/server_ng_config/defaults.rs 100.00% <100.00%> (ø)
core/configs/src/server_ng_config/server_ng.rs 40.98% <ø> (-2.77%) ⬇️
core/message_bus/src/connector.rs 94.31% <100.00%> (+1.46%) ⬆️
core/message_bus/src/installer/conn_info.rs 100.00% <100.00%> (ø)
core/message_bus/src/installer/quic.rs 100.00% <100.00%> (ø)
core/message_bus/src/installer/ws.rs 100.00% <100.00%> (ø)
core/message_bus/src/lifecycle/shutdown.rs 99.09% <100.00%> (+0.54%) ⬆️
core/message_bus/src/socket_opts.rs 100.00% <ø> (ø)
... and 33 more

... and 31 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@atharvalade atharvalade left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

found these during first round of review... I'll continue to review later. Overall seems good

Comment thread core/message_bus/src/transports/ws.rs Outdated
Comment thread core/message_bus/src/transports/ws.rs
Comment thread core/server-ng/src/dedup.rs Outdated
Comment thread core/message_bus/src/transports/ws.rs
@hubcio hubcio force-pushed the feat/message-bus-multi-transport branch 2 times, most recently from 75ab18b to c9f5aeb Compare April 30, 2026 13:07
hubcio added a commit that referenced this pull request May 1, 2026
`cargo doc --no-deps -p message_bus -p configs` flagged 14 broken or
private intra-doc links introduced by PR #3192. Sweep them all so the
docs build is clean for this PR's surface (one pre-existing
`ClusterConfig::validate` warning in core/configs/server_config remains;
it is outside this PR's scope).

  * `socket_opts`: was `pub(crate)`, four public-doc sites linked to
    it (`installer/{tcp_tls,wss}`, `client_listener/{tcp_tls,ws}`).
    Promote to `#[doc(hidden)] pub mod` so the cross-references resolve
    without enlarging the crate's public API surface in rustdoc output.
  * `lib.rs::with_config`: `[IggyDuration]` / `[IggyByteSize]` were
    bare names; qualify as `[iggy_common::IggyDuration]` /
    `[iggy_common::IggyByteSize]`.
  * `config.rs` module preamble: `[configs::server_ng_config::ServerNgConfig]`
    pointed at a private module path; the public re-export lives at
    `configs::server_ng::ServerNgConfig`. Same fix for
    `[IggyMessageBus::with_config]` -> `[crate::IggyMessageBus::with_config]`
    and the two `Iggy*` references.
  * `transports/tcp_tls.rs` module preamble: `[TlsPumpState]` /
    `[SharedAcc]` are private types - drop the bracket form, keep
    backticked code.
  * `replica/listener.rs::run`: `[handshake_read]` is private - same
    drop-the-brackets fix.
  * `configs::server_ng_config::websocket` module preamble:
    `[Self::to_tungstenite_config]` does not resolve at module-level
    `//!` doc; replace with `[WebSocketConfig::to_tungstenite_config]`.

Doc-only; no behavior change.
@hubcio hubcio force-pushed the feat/message-bus-multi-transport branch from 0ef2afc to 85504a3 Compare May 1, 2026 12:18
hubcio added 24 commits May 4, 2026 08:47
The message_bus client plane only spoke plaintext TCP, so SDK clients
behind TLS-only middleboxes or browser bridges had no way in, and the
bus could not interop with the rest of the server's transports.

Adds four new client listeners (QUIC, TCP-TLS, WS, WSS), each with a
bind/listen path, a per-connection installer, and a cancel-safe pump.
Every transport caps inbound peers with a bounded handshake_grace
(default 10s) so slowloris peers cannot pin install slots; WSS shares
one wall-clock budget across the TLS and WS handshakes. The QUIC
accept loop spawns per-incoming so a slow handshake no longer wedges
the listener. The replica listener gets the same bound: each accepted
stream runs its handshake on a spawned task under
compio::time::timeout, mirroring the client-plane defense, and
in-flight handshake handles are reaped opportunistically so the
handle vector stays bounded for the listener's lifetime.

MessageBusConfig now consumes the [message_bus] block of the server-ng
config schema directly via From<&ServerNgConfig>. IggyDuration ->
Duration, IggyByteSize -> usize, and the WS frame schema ->
tungstenite::WebSocketConfig conversions happen at the boot boundary
so hot paths read pre-converted fields. WS frame-layer tunables
(max_message_size, max_frame_size, write_buffer_size,
accept_unmasked_frames) live under [message_bus] so SDK-client burst
characteristics are tuned independently of the legacy [websocket]
listener. Forks tcp/websocket/quic into server_ng_config so server-ng
can evolve its TLS/WS/QUIC surface without disturbing the legacy
server.

Hardening that came up while writing the new paths:

- TLS reads are cancel-safe. The previous pump ran framing's
  read_message inside select_biased!; when the mailbox arm won
  mid-frame, the future dropped its scratch buffer while rustls had
  already advanced its plaintext queue past those bytes, and the
  next iteration parsed garbage as a fresh header. Replaced with a
  resumable read_step that issues exactly one tls.read.await per
  call, with the framing accumulator owned by a TlsPumpState on the
  pump's stack frame and shared via SharedAcc
  (Rc<UnsafeCell<Owned<MESSAGE_ALIGN>>>; !Send + !Sync; single
  shard, single mut-borrower invariant enforced by debug_asserts).
  The WS analogue is upstream-blocked on compio_ws 0.4 split() and
  is documented as a known limitation.
- compio::runtime::spawn silently swallows panics, which left a
  panicking on_message / on_request handler with an orphan registry
  slot. Replica + TCP dispatch tasks install scopeguards that
  synchronously evict the registry slot, fire
  notify_connection_lost (replica), and remove the client_meta
  (TCP). Async drain on clean exit observes the slot gone and
  becomes a no-op.
- Writer-task panic on TCP / QUIC previously left the reader parked
  on a live socket. Each writer task now installs a scopeguard that
  triggers the per-connection Shutdown on exit; the watchdog
  observes the conn-side fire, calls libc::shutdown(SHUT_RD), and
  the reader unparks. Two regression tests cover the eviction chain
  via a synthetic WriterPanicConn.
- TCP-TLS, WS, and WSS writers batch by draining the mailbox up to
  max_batch via try_recv after the first message, write each frame,
  then emit a single flush. Mirrors the plain-TCP writev pattern
  and avoids paying one TLS record + one flush per queued frame on
  a sustained burst.
- QUIC writer uses send.write_all(frozen) instead of send.write,
  so frame boundaries are not corrupted on backpressure. Drops the
  per-frame flush; quinn-proto coalesces STREAM frames and
  explicit flush serialized every send.
- Cluster coordinator snapshot rebroadcasts Clear frames for
  forgotten replica slots via a 3-state MappingSlot enum, so a
  peer that missed a clear-frame on a full inbox eventually
  re-syncs.
- Per-connection FusedShutdown folds bus-token and conn-token into
  one channel, dropping the bridge task that used to run per
  install.
- send_to_* fast path no longer clones the Frozen payload on the
  registry-hit branch; Bytes::from_owner removes a per-frame Vec
  alloc and memcpy on WS/WSS outbound.

Hygiene: tungstenite unified at 0.28 across the workspace (was 0.28
and 0.29 side by side); AcceptedQuicConn newtype owns the QUIC
accept callback shape so compio-quic types stay private; WebSocketConfig
is re-exported as message_bus::WebSocketConfig so callers do not need
a direct compio_ws dep; install_client_<transport> /
install_replica_tcp[_fd] renamed for symmetry; ShardFramePayload,
SendError, and the ClientConnMeta / conn_info types are
#[non_exhaustive].
…section

c9f5aeb removed the BLAKE3-keyed MAC + per-peer nonce ring that
gated inbound replica handshakes (~900 LOC under auth/) without
mention in the commit body. The promised LOGIN_REPLICA mitigation
does not yet exist anywhere in the tree. Drop a TODO at the listener
auth rustdoc so the gap is visible until the replacement lands.
Four rustdoc blocks claimed TCP_NODELAY + SO_KEEPALIVE apply to
accepted client sockets, but socket_opts::apply_nodelay_for_connection
sets only TCP_NODELAY. Replica<->replica liveness is observed by VSR
heartbeats and SDK clients manage their own keepalive policy at the
application layer, so omission is intentional, not a bug. Strip the
SO_KEEPALIVE claim and reference the schema rationale at
configs/src/server_ng_config/message_bus.rs:49-52 instead.
Reuse a single Vec<BusMessage> across iterations of the per-connection
pump in tcp_tls, ws, and wss. Each PumpAction::Send arm previously
allocated a fresh batch with capacity max_batch, then consumed it in a
for-in loop and dropped the allocation on every drain. Mirror the
plain-tcp writer (which already hoists via mem::take) by allocating
once outside the loop and clearing via drain(..). Eliminates one
heap allocation per drain on TLS-family transports, no semantic
change.
The TODO at the replica listener auth section and the SO_KEEPALIVE
clarifications in the four client-install paths cited paths with line
ranges and a session-local finding label. Both rot on the next
refactor and mean nothing to a future reader. Replace with
intra-doc links to crate::socket_opts and a TODO(hubcio): owner
prefix; fold the rationale into prose so the comment stands alone.
drive_close ran tls.flush() to completion before entering
compio::time::timeout(close_grace, tls.shutdown()), so a peer that
refused to drain ciphertext could stretch the close phase past the
configured grace. Wrap flush + shutdown in one timeout so a stalled
flush counts against the same wall-clock budget as the shutdown
step. Mirrors the pattern already used by the wss close path.
Hoisting the writer batch outside the pump loop in tcp_tls/ws/wss
trips clippy::iter_with_drain (nursery, warn-by-default for the
crate). Its suggestion to switch to into_iter() would move the Vec
out and defeat the hoist, since the allocation must survive the
iteration to be reused. Allow the lint at each call site with a
brief note on why drain(..) is the right call here.
Three small doc fixes surfaced in review:

- transports/mod.rs: rename "Batch atomicity" to "Batch ordering" and
  spell out that writev is not atomic; the invariant is FIFO + tear
  down on short/failed write. Mirror the same wording in the
  TransportConn::run trait doc.
- transports/quic.rs: rustdoc reader_task with the cancel-safety
  hazard (select! drops a parked framing::read_message and loses
  bytes already pulled into the in-flight Owned).
- transports/ws.rs: comment at the WS-payload copy now states why
  MESSAGE_ALIGN exists at all (io_uring O_DIRECT alignment in the
  storage path) instead of leaving the reader to infer it from the
  Message<GenericHeader> invariant alone.

No behaviour change.
The bus's QUIC transport_config used to hardcode max_idle_timeout,
keep_alive_interval, send/receive windows, and stream caps, while
[`server_ng_config::QuicConfig`] carried the matching schema fields
unconsumed.

Wire those knobs through:

- New `QuicTuning` runtime substruct on `MessageBusConfig`,
  pre-converted from `cfg.quic` in `From<&ServerNgConfig>` (mirrors
  the existing pattern for the WebSocket frame-layer tunables).
- New `transports::quic::transport_config_from(&QuicTuning)` that
  applies the operator-tunable knobs (windows, idle timeout,
  keep-alive, initial MTU, datagram send buffer) and clamps the
  architectural invariants (max_concurrent_uni_streams = 0, CUBIC
  congestion). Replaces `default_transport_config()`.
- `server_config_with_cert` now takes `&QuicTuning`. The replica
  bootstrap and the integration test thread it from
  `bus.config().quic` / `QuicTuning::default()` respectively.
- `core/server-ng/config.toml` `[quic]` defaults retuned to match the
  bus's previous hardcoded values (1 bidi stream, 64 MiB windows,
  30 s idle, 10 s keep-alive). The legacy server's QUIC defaults
  stay where they are; this section is server-ng only.

Zero `Duration` for `keep_alive_interval` / `max_idle_timeout` is
treated as "disabled" so the conversion never feeds quinn an
`IdleTimeout(0 ms)` that would tear every connection down.
The tcp_tls pump's `select!`-over-`tls.read` is sound only because
`compio_tls::TlsStream::read` drains through `futures-rustls`'s
`Stream::poll_read`, which leaves the rustls session intact on a
Pending-state cancel. That contract is library-version-sensitive
and silently breakable on a transitive bump.

Pin compio to `=0.18.0` so the futures-rustls minor cannot drift
under a routine `cargo update`, and add `tcp_tls_cancel_safe` as
the counterpart to `cancel_unsafe.rs`: cancel a Pending TLS read,
re-issue, assert the subsequent plaintext is delivered intact.
The two guards are cross-referenced in the tcp_tls module rustdoc
so a future reader hitting one tripwire knows where the other
lives.

Also extend `client_listener` with an explicit "Authentication
boundary" section: accept loops are deliberately authn-agnostic,
LOGIN is the sole authn point, and per-transport pre-LOGIN
gating (TLS / ALPN / 0-RTT rejection) is documented so operators
do not assume the listener layer enforces a policy that lives in
the dispatcher.
The `# F10 (WS / WSS) is unaffected by this fix` heading at run_pump
referenced an internal review tag. Project policy is for source to stand
alone without review refs or finding tags. Replace with prose that names
the actual transports (`crate::transports::ws`, `crate::transports::wss`)
and the upstream blocker (compio_ws lacking a split-reader plus
next_item API) so the comment's intent survives the review cycle.
Replace the generic "Auth" section with an explicit "Trust boundary
(regression vs prior MAC)" preamble. The current wording mixed two
distinct claims ("no transport-level auth" and "TLS is operator's
responsibility") and softened the security implications of the missing
BLAKE3-keyed MAC. The new wording:

  * names the regression: an attacker that learns the cluster id can
    register as any peer with a smaller replica id and feed forged
    consensus traffic until `server-ng` rejects at the app layer.
  * states the deployment requirement in MUST form: until LOGIN_REPLICA
    lands, operators run the replica port behind a trusted L2 boundary
    (private subnet, encrypted overlay, or physical isolation).
  * keeps the cross-reference to the in-source TODO covering MAC
    restoration.

Doc-only; no behavior change.
Four rustdoc statements diverged from the actual implementation. None
change behavior; all caused real reader confusion in review.

  * `transports/mod.rs` Batch ordering: claimed "Per-frame transports
    (WS, QUIC, TLS) do NOT batch". TCP-TLS / WS / WSS pumps in fact
    drain up to `max_batch` into a `Vec<BusMessage>` and then issue per-
    record writes followed by one trailing `flush()`. Only QUIC writes
    per frame. Distinguish three dispatch shapes: vectored batch (TCP),
    drain-and-flush (TCP-TLS / WS / WSS), per-frame (QUIC).

  * `client_listener/mod.rs` pre-LOGIN gating: claimed an operator may
    swap in a `rustls::ServerConfig` to opt into client certs and that
    QUIC enforces ALPN. The current `bind` API only accepts
    `TlsServerCredentials`, always builds with `with_no_client_auth`,
    and `transports::quic::server_config_with_cert` never sets
    `alpn_protocols` (sibling `client_listener::quic` rustdoc already
    states "no ALPN advertised"). State the actual contract: cert + key
    only, mTLS via fronting terminator, QUIC defers protocol-version
    validation to the LOGIN command.

  * `config.rs` module preamble + `ws_config` field doc: claimed the
    bus reads `cfg.websocket.to_tungstenite_config()`. The actual source
    is the bus-owned `ws_*` fields under `[message_bus]`, plumbed via
    `build_ws_config(bus)` inside `From<&ServerNgConfig>`. Misleading
    for anyone debugging WS frame ceilings. Point at the right block.

  * `transports/tcp_tls.rs` `with_close_grace`: claimed the override
    bounds only `TlsStream::shutdown`. Since 3c1f1c6 the budget covers
    both the trailing `flush()` and `shutdown()` sharing one timer.
    Match the module-level wording.
`cargo doc --no-deps -p message_bus -p configs` flagged 14 broken or
private intra-doc links introduced by PR #3192. Sweep them all so the
docs build is clean for this PR's surface (one pre-existing
`ClusterConfig::validate` warning in core/configs/server_config remains;
it is outside this PR's scope).

  * `socket_opts`: was `pub(crate)`, four public-doc sites linked to
    it (`installer/{tcp_tls,wss}`, `client_listener/{tcp_tls,ws}`).
    Promote to `#[doc(hidden)] pub mod` so the cross-references resolve
    without enlarging the crate's public API surface in rustdoc output.
  * `lib.rs::with_config`: `[IggyDuration]` / `[IggyByteSize]` were
    bare names; qualify as `[iggy_common::IggyDuration]` /
    `[iggy_common::IggyByteSize]`.
  * `config.rs` module preamble: `[configs::server_ng_config::ServerNgConfig]`
    pointed at a private module path; the public re-export lives at
    `configs::server_ng::ServerNgConfig`. Same fix for
    `[IggyMessageBus::with_config]` -> `[crate::IggyMessageBus::with_config]`
    and the two `Iggy*` references.
  * `transports/tcp_tls.rs` module preamble: `[TlsPumpState]` /
    `[SharedAcc]` are private types - drop the bracket form, keep
    backticked code.
  * `replica/listener.rs::run`: `[handshake_read]` is private - same
    drop-the-brackets fix.
  * `configs::server_ng_config::websocket` module preamble:
    `[Self::to_tungstenite_config]` does not resolve at module-level
    `//!` doc; replace with `[WebSocketConfig::to_tungstenite_config]`.

Doc-only; no behavior change.
Schema and runtime defaults disagreed on byte units. The TOML used
SI ("100 KB" = 100 000, "8 KB" = 8 000) while QuicTuning::default()
used `100 * 1024` (102 400) and `8 * 1024` (8 192). Production loaded
through the schema (SI) and tests using `QuicTuning::default()` saw the
binary literals; integration tests like `quic_client_roundtrip` would
observe different datagram-send-buffer / MTU sizes than the deployed
server.

Pick KiB on the schema side: it matches the rest of the [quic] block
(`send_window` / `receive_window` already use MiB) and keeps the binary
semantics that the quinn-proto buffers actually expect.

Add `quic_tuning_default_matches_schema` in `message_bus::config` to
pin every QuicTuning field across the schema-derived and literal-default
paths so any future drift trips the test.

Existing `embedded_default_toml_deserializes_and_validates` still
passes; the schema unit change does not affect deserialization.
Cover three classes of misconfiguration that the previous validate()
did not reject:

  * Zero `close_grace` made every TLS shutdown / WS close drive_close
    expire on the first poll, dropping connections without flushing.
  * Zero `close_peer_timeout` skipped the writer/reader join phase
    entirely, leaking peer state on graceful shutdown.
  * Zero `reconnect_period` would tight-loop the outbound dialer.

Each now returns InvalidConfigurationValue with a named-field eprintln
matching the existing handshake_grace pattern.

Add a cross-field check on the WebSocket size chain:
ws_max_frame_size <= ws_max_message_size <= max_message_size. A
misordered configuration (e.g. frame_size > message_size) silently
caused tungstenite to reject every inbound frame at runtime; surface
the error at boot instead.

Add a debug_assert at `From<&ServerNgConfig> for MessageBusConfig` so
callers that build a `ServerNgConfig` by hand (tests, simulators) and
skip validation trip the assertion in dev builds. Production load via
`ServerNgConfig::validate()` already covers this path.

Six new unit tests pin the new validators (zero values + WS size chain
both directions, plus a happy-path ascending-order acceptance).
Previously the runtime conversion in `core::message_bus::config::build_quic_tuning`
relied on `expect("...validated by schema")` for every numeric range
check, but no validator on `QuicConfig` existed. A misconfigured
`max_concurrent_bidi_streams` / `initial_mtu` / `receive_window` /
`send_window` / `datagram_send_buffer_size` would panic deep inside the
bus crate at boot rather than surface as a `ConfigurationError`.

Add `impl Validatable<ConfigurationError> for QuicConfig` covering:

  * `max_concurrent_bidi_streams >= 1` and fits in `u32` (quinn-proto
    stream count limit).
  * `datagram_send_buffer_size` fits in `usize` (compio-quic
    materializes a `Vec<u8>` of this length).
  * `initial_mtu >= 1200` (RFC 9000 §14 minimum) and fits in `u16`
    (quinn storage type).
  * `receive_window` fits in `u32` (quinn VarInt cap).
  * `send_window` <= 2^62 - 1 (quinn VarInt cap).

Wire `self.quic.validate()` into `ServerNgConfig::validate()` alongside
the existing section validators.

Replace `build_quic_tuning`'s `.expect(...)` arms with bounded
`unwrap_or` saturations so dev / test code that skips
`ServerNgConfig::validate()` no longer panics; production load runs
through the validator and the saturations never fire.

Eight unit tests pin each new range check (zero, above-u32 stream
count, MTU below QUIC minimum, MTU above u16, receive_window above u32,
send_window above VarInt cap, plus baseline + boundary acceptance).
`Default for TcpSocketConfig` and `Default for MessageBusConfig`
hardcoded literals (100 KB recv/send buffers, max_batch=256, etc.)
instead of reading from the embedded `SERVER_NG_CONFIG` static_toml
struct. Sibling impls in the same file (`Default for TcpConfig`,
`Default for QuicConfig`, `Default for WebSocketConfig`) all source
their values from `SERVER_NG_CONFIG.<section>.<field>`; the bus + tcp
socket defaults were the lone outliers. The values matched today by
hand, but any future schema edit would silently drift the literals out
of sync, and the lock-step claim in the existing comments would
quietly stop being true.

Read all six `TcpSocketConfig` fields and the seven non-`ws_*_size`
`MessageBusConfig` fields directly from the schema. The optional
`ws_*_size` knobs stay `None` because the schema currently
comments them out (operators opt-in by uncommenting); when a schema
field becomes mandatory in a future PR the conversion will be a
one-liner here.

`max_batch` and `peer_queue_capacity` need an `as usize` cast because
static_toml emits integer literals as `i64`; the cast is safe given
the configs validator caps `max_batch` at `IOV_MAX_LIMIT_NG = 512`.
`IggyMessageBus::track_background` was push-only: every accept loop,
reconnect periodic, and per-WS-connect upgrade task that calls into it
appended its `JoinHandle` to a `Vec` that was never reaped until
`shutdown()`. The most visible source is `installer::install_client_ws_fd`,
which fires once per inbound WebSocket connection and never had a
matching cleanup. Over a long-running bus the vec grew unbounded
(one entry per WS connect since boot), wasting compio task-table
state for tasks that had long since completed.

Reap finished handles inside `track_background` itself, mirroring the
pattern in `client_listener::quic::run` and `replica::listener::run`
(where the accept loop already calls `handles.retain(|h| !h.is_finished())`
opportunistically before each accept). Centralising the reap here
covers every current and future spawn site that funnels through
`track_background`.

Compio's runtime is single-threaded, so `is_finished()` cannot flip
between the predicate evaluation and the drop inside the same
`retain` closure. Dropping a finished `JoinHandle` is a no-op.

No behavior change for code paths that already terminate quickly.
Add `IggyMessageBus::background_tasks_len()` behind
`#[cfg(any(test, debug_assertions))]` so tests can pin the leak-fix
invariant without exposing the underlying `RefCell<Vec<JoinHandle<()>>>`
to production callers.

Two new assertions:

  * `track_background_reaps_finished_handles_on_push` (unit) — drives
    the runtime between 32 spawns so each prior handle has a chance to
    finish before the next `track_background` call. The post-loop vec
    must contain at most one outstanding handle (the final spawn that
    has not yet been polled), proving the reap-on-push invariant.

  * `replica_panic_evicts_registry_and_notifies_loss` (integration)
    gains a post-`bus.shutdown(...)` assertion that
    `bus.background_tasks_len() == 0`. This is the canonical leak gate
    for the panic-cleanup suite: every client install path funnels
    through `install_client_conn`, so the shared cleanup invariant
    trips here first if a future change leaks a handle.
`QuicTransportConn::run` drained both spawned tasks by awaiting their
JoinHandles back-to-back before issuing `Connection::close(QUIC_SHUTDOWN, _)`.
With no wall-clock budget, a stuck reader (parked on
`RecvStream::read_chunk` after the peer goes silent) or a stuck writer
(parked on a backpressured dispatch send) keeps the run future alive
indefinitely, the peer never receives the CONNECTION_CLOSE frame, and
the bus's shutdown drain blocks waiting for the run handle.

Wrap both joins in `compio::time::timeout(close_grace, ...)`. On elapse
we fall through to `connection.close(...)` regardless, mirroring the
existing TCP-TLS / WSS bounded-close shape. The FusedShutdown observer
inside both spawned tasks already wakes them on bus / per-conn
shutdown so the elapse path stays rare in production.

Add a `close_grace: Duration` field to `QuicTransportConn` plus a
`with_close_grace` builder method (matching `TcpTlsTransportConn`).
Plumb `MessageBusConfig::close_grace` through `installer::install_client_quic`
so production deployments use the operator-configured budget; tests
construct via the default. Default mirrors `tcp_tls::DEFAULT_CLOSE_GRACE`.

Drop now-unused `IggyByteSize` / `IggyDuration` / `Duration` imports
from `configs/server_ng_config/defaults.rs` (made dead by an earlier
commit on this branch that wired the bus and tcp_socket defaults to
read from `SERVER_NG_CONFIG`).
`drive_close` for the WSS transport sent the WebSocket Close frame and
flushed tungstenite's write buffer, but never drove the inner TLS
layer's shutdown. The implicit Drop on `WebSocketStream<TlsStream<TcpStream>>`
tore down the TCP socket without ever issuing a `close_notify`, leaving
strict-checking peers (rustls itself, conformant proxies) to observe a
TLS truncation rather than a clean session end. RFC 8446 §6.1 treats
that as a hazard.

Asymmetric with `tcp_tls::drive_close`, which already calls
`tls.shutdown().await` to send `close_notify` before letting the TCP
layer FIN.

Add a third step to the WSS close sequence: after `ws.close(None)` and
`ws.flush()`, call `ws.get_mut().shutdown().await`. `get_mut()`
borrows the inner `TlsStream` without consuming the `WebSocketStream`,
so the WS layer's Drop still runs after `drive_close` returns. All
three steps share the existing `close_grace` budget; on elapse the
function returns early and Drop closes whatever remains, matching the
prior elapsed-warning shape.

Pull `compio::io::AsyncWrite` into scope so `shutdown()` resolves on
the inner TLS stream.
`ShardZeroCoordinator::delegate_client` and `delegate_ws_client` had
zero unit-test coverage. The existing `delegate_replica_*` test
exercised the replica path end-to-end (real loopback TCP fd, dup,
ShardFrame inspection on the target shard), but neither client path
was checked, so the meta-tag invariant
(`ClientTransportKind::Tcp` vs `ClientTransportKind::Ws`) was held
only by the call-site assignment.

Add two `#[compio::test]` mirrors:

  * `delegate_client_ships_setup_with_meta_transport_tcp` asserts the
    target inbox receives `ClientConnectionSetup` with `meta.transport
    == Tcp` and the round-robin client_id encodes a valid target shard.
  * `delegate_ws_client_ships_setup_with_meta_transport_ws` does the
    same for `ClientWsConnectionSetup` and `ClientTransportKind::Ws`.

Both tests share the loopback-TCP-pair fixture pattern of the existing
replica test so the dup, peer_addr, and try_send paths run against a
real fd. The tests catch any future change that mismatches the
ShardFramePayload variant or the meta tag.
Five clippy warnings introduced by this branch's earlier doc edits:

  * config.rs: rephrase the QuicTuning conversion docstring so MTU
    bound and `receive_window` bound do not start lines with
    `>= ` / `<= ` markers (clippy parses them as malformed Markdown
    blockquotes). Also fix the now-stale `unwrap_or_else` reference
    (we moved to `unwrap_or` saturations earlier).
  * replica/listener.rs: backtick `WireGuard` (clippy missing-backticks).
  * transports/quic.rs: backtick `close_grace` in the
    with_close_grace test docstring.
  * transports/mod.rs: blank line before the trailing summary item in
    the Batch ordering bullet list (clippy doc-list-item indent).

No semantic change; doc-only.
@hubcio hubcio force-pushed the feat/message-bus-multi-transport branch from 85504a3 to e5b8c84 Compare May 4, 2026 06:47
hubcio added 2 commits May 4, 2026 09:12
WS/WSS run_pump destructured ActorContext with `..`, dropping
max_message_size; decode_consensus_frame hardcoded
framing::MAX_MESSAGE_SIZE. TCP/TCP-TLS/QUIC all thread the per-bus
cap into framing::read_message / read_step, so an operator that
lowered max_message_size got enforcement on TCP/QUIC and silent
bypass on WS/WSS.

Thread the cap into decode_consensus_frame and capture it in both
run_pump bodies. Add per-transport regression tests
(ws_rejects_oversize_against_custom_cap,
wss_rejects_oversize_against_custom_cap) that build a frame above a
custom cap and assert the server pump exits without delivering
the frame to in_rx.
Three production read sites (framing::read_message, tcp_tls::read_step,
ws::decode_consensus_frame) each carried a `48..52` magic offset for
the size field. framing.rs had a `const _:()` guard pinning
offset_of!(GenericHeader, size) == 48; tcp_tls.rs and ws.rs relied on
that guard living in a separate module, so a refactor that touched
framing.rs would silently disarm the safety net.

Hoist the offset to iggy_binary_protocol::SIZE_FIELD_OFFSET next to
GenericHeader, move the offset_of! assertion into the existing layout
guard block on the type, and add an inline read_size_field(&[u8])
helper that returns Option<u32>. Replace all three production reads
plus the test write sites in framing.rs, tcp.rs, and quic.rs with the
shared helper / constant.

No wire change. The const _: () in framing.rs is removed because the
guard now lives next to the field it protects.
@numinnex numinnex merged commit e937852 into master May 4, 2026
80 checks passed
@numinnex numinnex deleted the feat/message-bus-multi-transport branch May 4, 2026 12:21
Standing-Man pushed a commit to Standing-Man/iggy that referenced this pull request May 6, 2026
…nts (apache#3192)

Replica plane stays TCP forever: VSR FIFO + view-change timing,
fd-delegation, writev batching all rely on plaintext between trusted
replicas. SDK-client plane gains four transports alongside TCP:

- QUIC: shard-0 terminal (compio-quic CID demux), 1 bidi stream per
  peer, 0-RTT off + listener defense-in-depth reject.
- TCP-TLS: rustls 1.3, no client auth, 0-RTT off, compio-tls behind
  unified TransportConn::run with bounded close_grace shutdown.
- WS: compio-ws over plaintext TCP; pre-upgrade fd cross-shard
  handover keeps fd-delegation on plain TCP only.
- WSS: WebSocketStream over TlsStream; both handshakes run on the
  per-connection install task.

Shared: TransportListener / TransportConn trait family; WebSocketConfig
+ close_grace threaded through MessageBusConfig and applied uniformly
across TCP-TLS, WS, WSS; bounded safe-shutdown (no select! over
stream.shutdown); single-task pump per WS/WSS using compio-ws
cancel-safe read. Bus auth thin: both planes connect unauthenticated;
server-ng gates via LOGIN_USER / LOGIN_WITH_PAT and future
LOGIN_REPLICA. Ping announces replica_id only; no subprotocol, no
ALPN, no MAC. Per-connection metadata flows via
IggyMessageBus::client_meta; ShardFramePayload setup variants carry
ClientConnMeta end to end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants