Skip to content

[multistage] Sender-side gRPC back-pressure for mailbox + MAILBOX_CLIENT_USED_* gauges (draft)#18519

Open
gortiz wants to merge 39 commits into
apache:masterfrom
gortiz:mse-backpressure
Open

[multistage] Sender-side gRPC back-pressure for mailbox + MAILBOX_CLIENT_USED_* gauges (draft)#18519
gortiz wants to merge 39 commits into
apache:masterfrom
gortiz:mse-backpressure

Conversation

@gortiz
Copy link
Copy Markdown
Contributor

@gortiz gortiz commented May 18, 2026

Why

Customer hit OutOfDirectMemoryError inside MessageFramer.writeRaw from GrpcSendingMailbox.sendContent:

failed to allocate 4194304 byte(s) of direct memory (used: 25163727127, max: 25165824000)
  at io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(...)
  ...
  at io.grpc.internal.MessageFramer.writeRaw(MessageFramer.java:294)
  ...
  at org.apache.pinot.query.mailbox.GrpcSendingMailbox.sendContent(GrpcSendingMailbox.java:227)

Root cause: GrpcSendingMailbox calls StreamObserver.onNext(...) unconditionally on every chunk. The runtime object is a ClientCallStreamObserver exposing isReady() / setOnReadyHandler(), but we don't use either. The receiver-side MailboxStatusObserver already writes back buffer-size metadata to the sender, but it is explicitly thrown away:

// pinot-query-runtime/.../channel/MailboxStatusObserver.java
// TODO: this feedback info is not used to throttle the send speed. it is currently being discarded.

When the receiver drains slower than the sender writes (slow consumer, full mailbox queue, large fan-out, skewed hash shuffle, etc.) the proto chunks pile up in gRPC Netty's outbound queue until the JVM-wide direct-memory cap is hit.

Shape of this PR

This PR ships the mechanism, configs, and observability — production behaviour is unchanged.

The two back-pressure features (sender-side isReady gate, receiver-side manual inbound flow control) both default to off. The OutOfDirectMemoryError failure mode is unchanged from pre-PR for any cluster that doesn't explicitly opt in. Operators who hit the OOM flip two booleans — pinot.query.runner.grpc.sender.backpressure.enabled and pinot.query.runner.grpc.manual.inbound.flow.control.enabled — both to true, optionally raise pinot.query.runner.grpc.inbound.message.credit from its conservative default of 1, and they get the bounded-sender behaviour the rest of this PR description describes.

Transport-tuning defaults (HTTP/2 stream window, Netty WriteBufferWaterMark) are raised from gRPC/Netty's KB-scale defaults to MB-scale — those don't add any per-send cost and are useful regardless of whether the gate is engaged. They can also be tuned per-config if an operator wants to dial them back.

Reproducer / test

GrpcSenderBackpressureTest (in pinot-query-runtime/src/test/.../mailbox/) stands up two real MailboxService instances on localhost (full gRPC stack — not mocked), spawns a slow reader thread polling the receiving mailbox every 20 ms (~50 blocks/s), and runs a fast sender loop on the test thread (sender.send(block) in a tight loop with the same small RowHeapDataBlock reused). The new MAILBOX_CLIENT_USED_* gauges (see "Observability" below) are printed so the behaviour is visible in CI output. The test enables both back-pressure features explicitly in its PinotConfiguration.

Pre-fix behaviour (no protection): sender outruns reader by ≥1858×, client direct memory grows by tens of MB without bound.

Opt-in (feature enabled) behaviour: sender stays roughly in step with reader (~35× ratio — the residual is the HTTP/2 window's worth of in-flight messages before isReady() flips false), client direct memory bounded.

Two companion tests pin the rest of the matrix:

  • GrpcSenderBackpressureOffPathTest — same scenario with the gate at its production default (off); asserts the pre-fix unbounded sender behaviour is preserved so the off-path is a true no-op.
  • GrpcSenderBackpressureTightGateTest — feature enabled but transport configs shrunk to KB-scale, so the application gate (rather than the wider transport) is the dominant back-pressure mechanism. Asserts the tight polledCount * 50 + 10_000 bound that a no-gate refactor would fail.

What

Three layers of mechanism that compose, all introduced by this PR:

1. Sender-side application gate (GrpcSendingMailbox)

Casts _contentObserver to ClientCallStreamObserver, registers an onReadyHandler via ClientResponseObserver.beforeStart (gRPC rejects late registration), and blocks the calling thread on a Condition until isReady() flips or the mailbox is terminated.

  • MailboxStatusObserver becomes a ClientResponseObserver, signalling the sender's Condition on transport-ready transitions and on early-terminate metadata so any receiver-side event can wake a stuck sender.
  • GrpcSendingMailbox.awaitReady(boolean bypassReady) is the new wait. Normal sends wait for isReady() with the query deadline as timeout; cancel/close sends pass bypassReady=true so a cancel issued while the receiver is congested can still push its error EOS through without re-blocking on the very back-pressure it's trying to escape. Pokes QueryThreadContext.checkTerminationAndSampleUsage on each iteration so query cancellation can unblock through the existing MSE path.
  • _readyLock (the ReentrantLock that already guards _readyCond) now serializes every call to _contentObserver.onNext / onCompleted across sendContent, send(Eos), cancel, and close — closing a family of races where the data path and a concurrent cancel could both call onNext on the non-thread-safe ClientCallStreamObserver. Two regression tests in GrpcSendingMailboxTest reproduce the race on unfixed code (lazy-init race + EOS-vs-cancel half-close race) and confirm the lock closes them.
  • Spool: when a stage spools to multiple destination mailboxes via BlockExchange.BlockExchangeSendingMailbox, one slow downstream worker throttles the whole spool. This is intentional — the alternative (forking each destination onto its own thread) would re-introduce the unbounded outbound queue we're fixing. The javadoc on awaitReady calls this out so it doesn't surprise anyone.
  • Opt-in: pinot.query.runner.grpc.sender.backpressure.enabled (default false). Setting to true engages the gate. Operators who don't hit the OOM see no behaviour change.

2. Transport-layer tuning (GrpcMailboxServer + ChannelManager)

The default gRPC HTTP/2 stream window is 65 535 bytes initial, BDP-tuned to ~1 MiB on localhost/LAN. At those sizes isReady() flips false on almost every non-trivial chunk, so the gate's slow path (park/wake on Condition) engages essentially per-send. Widening the transport pipeline keeps isReady() true for whole blocks at a time. Three new configs (all in pinot-spi/CommonConstants.MultiStageQueryRunner):

Config Default Layer
pinot.query.runner.grpc.flow.control.window.bytes 64 MiB HTTP/2 stream window the receiver advertises (NettyServerBuilder.flowControlWindow)
pinot.query.runner.grpc.write.buffer.high.water.mark.bytes 64 MiB Netty per-channel WriteBufferWaterMark high (sender outbound queue)
pinot.query.runner.grpc.write.buffer.low.water.mark.bytes 32 MiB Netty per-channel WriteBufferWaterMark low

These three are the only defaults this PR changes that have any runtime effect by default. They affect throughput (not safety): wider window/watermark means fewer round-trips per request and a higher direct-memory bound (watermark × #peers on the sender, window × #concurrent inbound streams on the receiver). The Javadoc on each key carries the scaling formula so operators can size against -XX:MaxDirectMemorySize. Fail-fast Preconditions.checkArgument validation at GrpcMailboxServer / ChannelManager constructors rejects misconfiguration at startup (negative watermarks, low > high, window < maxInboundMessageSize) rather than letting it surface mid-query.

3. Manual inbound flow control (GrpcMailboxServer + MailboxContentObserver)

gRPC's default auto-inbound-flow-control issues request(1) after every MailboxContentObserver.onNext returns. That ties the sender's HTTP/2 window replenishment to the receiver's per-message processing time (including the offerRaw lock inside the application queue). Even with the wider 64 MiB window the sender ends up parked in awaitReady for most chunks because only one message is in flight on a stream at a time.

When this feature is enabled, the receiver switches to manual inbound flow control:

  • GrpcMailboxServer.open casts the response observer to ServerCallStreamObserver, calls disableAutoInboundFlowControl(), and request(N) once at stream open.
  • MailboxContentObserver.onNext calls request(1) at the very start of the method — before the offerRaw lock — so credit is replenished while the sender is still on the wire, not after the receiver is done processing.

Two new configs:

Config Default Meaning
pinot.query.runner.grpc.manual.inbound.flow.control.enabled false Opt-in. When true, the receiver uses the manual-flow path above. When false, gRPC's auto-inbound applies — 1 message in flight per stream, pre-PR behaviour.
pinot.query.runner.grpc.inbound.message.credit 1 Conservative default — equivalent to gRPC auto when the manual-flow flag is on. Operators tune up for throughput on small/medium blocks; the bench measures it at 128.

Cancel-propagation latency widens with the credit value (the in-band error EOS sits behind buffered inbound up to min(credit messages, flowControlWindow bytes)). Issue #18541 tracks the proper out-of-band cancel work — relevant if operators set a large credit.

Observability (MAILBOX_CLIENT_USED_* gauges)

BrokerGauge and ServerGauge gain two entries each, sourced from ChannelManager._bufAllocator.metric():

Gauge Source
MAILBOX_CLIENT_USED_DIRECT_MEMORY usedDirectMemory()
MAILBOX_CLIENT_USED_HEAP_MEMORY usedHeapMemory()

Mirrors the existing MAILBOX_SERVER_USED_* gauges that GrpcMailboxServer publishes for the inbound (server) allocator. Closes the gap that today we have no per-process visibility into the gRPC client outbound pool — which is exactly the pool that exhausts in this failure mode.

MailboxService registers them in its constructor, picking BrokerMetrics vs ServerMetrics based on InstanceType. Both direct and heap are reported so the numbers stay meaningful regardless of -Dio.netty.noPreferDirect. Four matching @VisibleForTesting accessors (getMailbox{Client,Server}Used{Direct,Heap}MemoryBytes()) let tests and other internal callers read the values without going through the metrics registry.

Benchmark

BenchmarkGrpcMailboxSend was rewritten to a request-shape probe: one @Benchmark invocation ships 128 MiB split into pre-computed RowHeapDataBlocks of _blockSizeBytes, and waits on a CountDownLatch for the receiver-side drain to complete (drainer thread blocks on the receiver's Reader::blockReadyToRead callback via a Semaphore). The bench explicitly enables both back-pressure features and sets credit to 128 in its PinotConfiguration — these numbers measure the feature-enabled regime, not the production default. Time per request, ms/op (lower is better):

Block FlowCtl Window gate=on (ms) gate=off (ms) Δ (gate cost)
8 KiB 64 KiB 354.0 ± 19.7 348.7 ± 52.9 +1.5%
8 KiB 1 MiB 341.9 ± 78.3 332.9 ± 32.0 +2.7%
8 KiB 64 MiB 344.7 ± 46.5 353.1 ± 24.6 −2.4%
8 MiB 64 KiB 116.4 ± 28.2 114.2 ± 47.6 +1.9%
8 MiB 1 MiB 57.9 ± 4.0 46.7 ± 19.7 +24.0%
8 MiB 64 MiB 49.3 ± 31.2 32.5 ± 1.7 +51.7%
32 MiB 64 KiB 136.3 ±126.6 122.9 ±142.9 +10.9%
32 MiB 1 MiB 71.8 ± 9.7 50.1 ± 7.4 +43.3%
32 MiB 64 MiB 55.6 ± 11.2 38.1 ± 1.5 +46.2%

gate=off is the sender-backpressure kill-switch (current default + pre-fix code path). gate=on is what an operator opting into the feature gets.

The pre-PR steady-state on localhost is gate=off + window=1 MiB (gRPC's BDP-tuned default): 46.7 ms for 8 MiB blocks ≈ 2.7 GiB/s. Opt-in gate=on + window=64 MiB: 49.3 ms ≈ 2.6 GiB/s — ~5% slower than pre-PR steady-state and ~2× faster than pre-PR cold-start (window=65535, 114 ms).

The gate's wall-clock cost on large blocks comes from awaitReadyCondition.await while the per-channel watermark drains. Async-profiler wall-clock confirms 105 / 401 jmh-worker samples are in that exact path; the cost is pure waiting, not lock contention. Switching the gate to an async / setOnReadyHandler-driven path was considered and rejected: with the watermark in place, an async chunk queue would behave like a larger window — it would recover throughput but at the cost of more in-flight memory, which is the very thing we're bounding.

Backwards compatibility

  • New gauges: additive only — no rename or removal of existing gauges.
  • New MailboxService accessors: @VisibleForTesting, additive.
  • New CommonConstants.MultiStageQueryRunner config keys: additive. Both opt-in flags default to false; runtime behaviour is unchanged for any cluster that doesn't explicitly set them. The three transport-tuning defaults (window + 2× watermark) are raised from KB-scale to MB-scale; this changes throughput characteristics but no semantics, and operators can dial them back via config.
  • No on-wire protocol change. No existing-config rename. Rolling upgrades unaffected — a fixed sender talking to an old receiver is just the existing behaviour (the new isReady()-gated wait is purely client-local; the new transport configs are independently effective per side).

Test plan

  • GrpcSenderBackpressureTest — feature opt-in path with wide defaults, sender stays bounded, ratio ~35×.
  • GrpcSenderBackpressureTightGateTest — feature opt-in with narrow transport, asserts the application gate alone bounds the sender.
  • GrpcSenderBackpressureOffPathTest — production-default off path; asserts the pre-fix unbounded sender behaviour is preserved verbatim.
  • GrpcMailboxServerValidationTest, ChannelManagerTest — startup Preconditions.checkArgument gates: inboundMessageCredit > 0, writeBufferLowWaterMarkBytes > 0, low <= high, flowControlWindow >= maxInboundMessageSize. Each verified to fail-then-pass against unfixed/refixed production code.
  • GrpcSendingMailboxTest includes two new race regression tests — lazy-init race (sender + cancel from t=0, exactly-one observer created) and EOS-vs-cancel half-close race (200 iterations, neither call throws). Both verified to reproduce the bug on pre-fix code.
  • All other pinot-query-runtime/src/test/.../mailbox/ tests still pass (MailboxServiceTest, MailboxContentObserverTest, InMemorySendingMailboxTest). The testRemoteCancelledBySender* tests caught a real bug in the first cut of the fix (awaitReady short-circuiting cancel's own EOS) — fixed by the bypassReady flag.
  • spotless:apply clean, checkstyle:check 0 violations on touched modules.
  • CI green on apache/pinot.

🤖 Generated with Claude Code

… reproducer

Customers have hit `OutOfDirectMemoryError` inside `MessageFramer.writeRaw`
from `GrpcSendingMailbox.sendContent`: the sender pushes proto chunks faster
than the receiver drains, and gRPC's outbound queue fills the JVM's direct
memory pool. The root cause is that `GrpcSendingMailbox` calls
`StreamObserver.onNext` unconditionally — it does not respect gRPC's
`ClientCallStreamObserver.isReady()` hook and ignores the receiver-side
buffer-size feedback (see the explicit TODO in `MailboxStatusObserver`).

This change does not yet add the backpressure fix. It adds the observability
and a reproducer test so we can iterate on the design:

* New `MAILBOX_CLIENT_USED_DIRECT_MEMORY` and `MAILBOX_CLIENT_USED_HEAP_MEMORY`
  gauges on both `BrokerGauge` and `ServerGauge`, registered by
  `MailboxService` against the gRPC client allocator that backs every
  `GrpcSendingMailbox`. They mirror the existing `MAILBOX_SERVER_USED_*`
  gauges that `GrpcMailboxServer` already publishes for the inbound pool.
* `MailboxService` exposes `getMailbox{Client,Server}Used{Direct,Heap}MemoryBytes()`
  accessors returning the same values the gauges report.
* New `GrpcSenderBackpressureReproTest`: a fast-sender thread pushes the
  same small block over and over against a slow-reader thread polling every
  20 ms. Within 3 seconds the sender pushes ~1700x more blocks than the
  receiver polls and grows the client pool by tens of MB. When sender-side
  flow control is added, the test should be inverted to assert sender and
  reader stay roughly in step.
@gortiz gortiz requested a review from yashmayya May 18, 2026 17:21
@gortiz gortiz changed the title [multistage] Expose MAILBOX_CLIENT_USED_* gauges and add gRPC backpressure reproducer (draft) [multistage] Sender-side gRPC back-pressure for mailbox + MAILBOX_CLIENT_USED_* gauges (draft) May 18, 2026
@gortiz gortiz marked this pull request as ready for review May 18, 2026 18:20
@yashmayya yashmayya added enhancement Improvement to existing functionality multi-stage Related to the multi-stage query engine memory Related to memory usage or optimization labels May 18, 2026
Gate every `MailboxContent` chunk on `ClientCallStreamObserver.isReady()`,
blocking the calling (query-runner) thread until the gRPC outbound has room
again. This is the fix for the OOM stack trace documented in the previous
commit: with no `isReady()` gate, a sender that outpaces its receiver fills
gRPC's Netty outbound queue until the JVM-wide direct-memory pool is gone.

Mechanics:

* The observer passed to `stub.open(...)` is now an anonymous
  `ClientResponseObserver` that wraps `_statusObserver`. It registers
  `setOnReadyHandler` in `beforeStart()` — gRPC rejects late registration,
  so the cast-after-open approach did not work. The wrapper delegates the
  data callbacks unchanged and additionally signals `_readyCond` on
  `onError` / `onCompleted` so a sender blocked in `awaitReady` wakes up
  when the receiver-side stream closes. `MailboxStatusObserver` itself is
  unchanged.
* `GrpcSendingMailbox` owns a `ReentrantLock` + `Condition`. `awaitReady()`
  waits on that condition with the existing query deadline as the timeout,
  and pokes `QueryThreadContext.checkTerminationAndSampleUsage` on each
  iteration so query cancellation can unblock a stuck sender through the
  same path used elsewhere in the MSE.
* `cancel`/`close` thread a `bypassReady` flag through `processAndSend` /
  `sendContent`. The cancel path's own error EOS must still reach the
  receiver even when the outbound is congested, so it skips the wait. Normal
  user sends keep going through the gate.
* Spool note in the javadoc: a slow downstream throttles the whole spool by
  design; we are not forking each destination onto its own thread.

Test:

`GrpcSenderBackpressureReproTest` is inverted from "demonstrate the bug" to
"demonstrate the fix". Same setup (fast sender vs slow reader at ~50 polls/s)
but now asserts the sender does NOT massively outrun the receiver, and the
client allocator does not grow without bound. A watchdog timer issues
`sender.cancel` after the budget elapses so the loop exits cleanly within a
bounded wall-clock time. Sample numbers from a local run:

  pre-fix:  sent=197 022  polled=  106  ratio=1858.7x  pool growth=54 MB
  post-fix: sent=  4 225  polled=  119  ratio=  35.5x  pool growth= 8 MB

The remaining 35x is the gRPC HTTP/2 stream window's worth of in-flight
messages before `isReady()` flips false. Tightening that further is a
follow-up (per-channel Netty write-buffer watermark) and is left for the
next iteration.

Existing mailbox tests pass, including the two that exercise
`cancel`/`close` (`testRemoteCancelledBySender*`), which broke initially
when `awaitReady` short-circuited cancel's own send — fixed by the
`bypassReady` plumbing.
@gortiz gortiz force-pushed the mse-backpressure branch from 02d5654 to bcbe25b Compare May 18, 2026 18:39
gortiz added 2 commits May 18, 2026 21:01
New config key `pinot.query.runner.grpc.sender.backpressure.enabled` (default
true) plumbed through `MailboxService` into every `GrpcSendingMailbox`. When
disabled, `awaitReady()` short-circuits like the cancel/close bypass path and
the sender pushes unconditionally — restoring the pre-fix behaviour. Purpose:

  * production kill-switch in case the back-pressure gate causes an unexpected
    regression that the test suite did not catch;
  * A/B knob for `BenchmarkGrpcMailboxSend` so we can measure the gate's
    per-call overhead on identical code paths.

`GrpcSendingMailbox` keeps its original constructor as a thin delegate that
passes `backpressureEnabled = true`, so existing callers (notably
`GrpcSendingMailboxTest`) are unaffected.
`BenchmarkGrpcMailboxSend` exercises the steady-state send path under a
fast spin-poll drainer. Two `@Param` axes:

  * `_backpressureEnabled` (`true`, `false`) wired via the new
    `pinot.query.runner.grpc.sender.backpressure.enabled` config key. Lets us
    measure the gate's per-call overhead against pre-fix behaviour without
    recompiling or maintaining two branches.
  * `_payloadBytes` (`16`, `256`, `16384`, `1048576`) covers the regimes the
    customer's failure mode actually hits — small messages where the gate's
    fast path dominates (a single volatile `isReady()` read) and large
    messages that exceed the gRPC HTTP/2 stream window so the slow-path
    park/wake on the `Condition` is exercised.

Run with `java -cp pinot-perf/target/pinot-perf-pkg/lib/* \
  org.openjdk.jmh.Main org.apache.pinot.perf.BenchmarkGrpcMailboxSend` —
the class's own `main` constructs `OptionsBuilder` for default runs.

Local quick-run results (1 warmup × 3 s + 2 measurement × 3 s × 1 fork) have
not been collected yet; left to the reviewer / CI to run with full JMH
defaults for publication-quality numbers.
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented May 18, 2026

Codecov Report

❌ Patch coverage is 71.57895% with 54 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.28%. Comparing base (2910445) to head (f6d24f8).
⚠️ Report is 38 commits behind head on master.

Files with missing lines Patch % Lines
...apache/pinot/query/mailbox/GrpcSendingMailbox.java 74.76% 20 Missing and 7 partials ⚠️
...va/org/apache/pinot/spi/utils/CommonConstants.java 4.16% 23 Missing ⚠️
...org/apache/pinot/query/mailbox/MailboxService.java 86.95% 0 Missing and 3 partials ⚠️
...he/pinot/query/mailbox/channel/ChannelManager.java 85.71% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18519      +/-   ##
============================================
+ Coverage     63.75%   64.28%   +0.53%     
+ Complexity     1932     1137     -795     
============================================
  Files          3292     3314      +22     
  Lines        201470   204255    +2785     
  Branches      31316    31785     +469     
============================================
+ Hits         128442   131306    +2864     
+ Misses        62735    62413     -322     
- Partials      10293    10536     +243     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.28% <71.57%> (+0.53%) ⬆️
temurin 64.28% <71.57%> (+0.53%) ⬆️
unittests 64.28% <71.57%> (+0.53%) ⬆️
unittests1 56.74% <71.57%> (+0.94%) ⬆️
unittests2 35.52% <10.00%> (+0.26%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

yashmayya added 3 commits May 18, 2026 13:41
The termination poll inside awaitReady would abort an in-flight error EOS
mid-flight, leaving _senderSideClosed false and letting the subsequent cancel
path replace the original error code (e.g. SERVER_RESOURCE_LIMIT_EXCEEDED)
with QUERY_CANCELLATION on the receiver. EOS blocks are small and carry
control-plane info, so they should never be back-pressured.
- Mark _contentObserver volatile so cancel() / close() on a different
  thread (OpChain on-failure callback, watchdog in tests) sees the
  sender's lazy initialization with proper happens-before semantics.
- Fix gPRC -> gRPC typo in close() error message.
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice mechanism overall — ClientResponseObserver.beforeStart + setOnReadyHandler + a Condition for the slow path is the right shape, and the bypass-for-EOS reasoning around preserving the original error code is well thought through. A few concurrency and API-hygiene items worth tightening before merge — comments inline.

Comment thread pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java Outdated
gortiz added 5 commits May 19, 2026 08:50
Three concurrency fixes in GrpcSendingMailbox addressing review feedback:

1. send(MseBlock.Eos): set _senderSideClosed = true BEFORE
   _contentObserver.onCompleted(). Old order let a concurrent cancel()
   land in the window between the two statements, see isTerminated()
   still false, and call onNext(errorBlock) on a stream the sender had
   already half-closed -- gRPC throws IllegalStateException.

2. sendContent (data-block path): re-check isTerminated() after
   awaitReady() returns true and before onNext(content), only on the
   non-bypass path. Narrows -- but does not fully close -- the same race
   shape for data sends where a concurrent cancel could race onNext on
   the non-thread-safe ClientCallStreamObserver. The cancel/close paths
   use bypassReady=true and must still send their own error EOS, so the
   re-check is gated.

3. ClientResponseObserver wrapper: call wakeWaiters() in onNext() after
   delegating to _statusObserver. The receiver's early-terminate signal
   (MAILBOX_METADATA_REQUEST_EARLY_TERMINATE) is delivered as a
   MailboxStatus onNext -- NOT a stream close -- so a sender blocked in
   awaitReady would not wake until either the stream closed (it does
   not, for early-terminate) or the deadline elapsed. Also add
   isEarlyTerminated() to awaitReady's fast-path and loop-exit
   predicates so the sender bails out of the wait once the flag flips.
ChannelManager and GrpcMailboxServer no longer leak
io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric through
their public API. Replaced each getBufAllocatorMetric() with narrow
usedDirectMemoryBytes() / usedHeapMemoryBytes() longs -- which is all
MailboxService and the gauges ever consumed.

Also annotated the four MailboxService.getMailbox{Client,Server}Used*
accessors with @VisibleForTesting. They are only called by the
GrpcSenderBackpressure* tests and the BenchmarkGrpcMailboxSend bench,
and duplicate what the registered gauges expose -- the annotation makes
that scope intent explicit so we do not accidentally grow the public
surface.
…path

Previously the class Javadoc claimed a `@Param drainSleepMicros` that
throttled the drainer, but no such field existed -- the drainer
spin-polled with Thread.onSpinWait(). At every payload size the drainer
kept isReady() == true permanently, so the A/B between
backpressureEnabled = true/false measured only one extra volatile read
on the fast path; the slow-path park/wake the docstring promised was
never hit.

Add the missing param (`@Param({"0", "100"})` µs) and use
LockSupport.parkNanos in the drainer to actually throttle it. Updated
the class Javadoc to describe the two regimes accurately:
 * drainSleepMicros = 0: drainer keeps the receiver empty; A/B isolates
   the gate's fast-path overhead (one volatile isReady() read).
 * drainSleepMicros > 0: drainer lags, gRPC outbound fills, isReady()
   flips false, sender parks on the Condition. A/B now includes the
   park/wake cost.
…gate

The new `pinot.query.runner.grpc.sender.backpressure.enabled` flag has
no CI coverage today -- the JMH bench sweeps both values but JMH does
not run in CI. A refactor that broke the
`bypassReady || !_backpressureEnabled` short-circuit in
GrpcSendingMailbox.awaitReady would pass every existing test silently.

GrpcSenderBackpressureDisabledTest mirrors GrpcSenderBackpressureTest's
fast-sender / slow-reader setup but builds the MailboxService with the
flag off, then asserts sendCount > polledCount * 10. With the gate
properly disabled the ratio is roughly 5000x; with the gate still
active (regression) it would collapse to single-digit-x and the
assertion would fail.

Sample local output:
  [GrpcSenderBackpressureDisabledTest] sent=576649 polled=106 ratio=5440.1x
The test was written as a bug repro -- it asserted "sender massively
outruns receiver" to prove the OOM-precursor existed. After the gate
landed, the assertions were inverted to "sender stays roughly in step
with receiver"; the file is now a validator, not a repro.

 * Rename class + file: GrpcSenderBackpressureReproTest ->
   GrpcSenderBackpressureTest. No other files in the repo referenced the
   old name.
 * Update class Javadoc to describe what is validated, not what was
   reproduced. Removed the dangling reference to grpc-oom-analysis.md
   (a local-only analysis file, never committed).
 * Test method: fastSenderOutpacesSlowReceiver ->
   senderObservesBackpressureFromSlowReceiver, matching the current
   assertion direction.
 * Updated the [GrpcSenderBackpressureReproTest] string in the printf
   tag to match the new class name.
@gortiz
Copy link
Copy Markdown
Contributor Author

gortiz commented May 19, 2026

I've run the benchmark and performance drops sensibly when the feature is enabled. The reason is that the http2+netty window is tiny. I'm going to make it configurable and run the tests again

gortiz added 4 commits May 19, 2026 10:39
The wrapping ClientResponseObserver was calling wakeWaiters() on every
MailboxStatus from the receiver. Transport-level isReady() transitions
already reach a parked sender through setOnReadyHandler (registered in
beforeStart), and normal buffer-size ACKs do not change any predicate
that awaitReady() actually waits on. So signalling them was forcing a
spurious park/unpark cycle on every receiver ACK.

The one status-only change awaitReady() must still observe is
early-terminate (the stream stays open, so onError/onCompleted never
fire). Keep wakeWaiters() in that path; skip it otherwise.
Sender-side back-pressure has been on `ClientCallStreamObserver.isReady()`,
gating every chunk. With gRPC's HTTP/2 stream window starting at ~64 KB
(BDP-tuned to ~1 MB) and Netty's `WriteBufferWaterMark` defaulting to
32 KB low / 64 KB high, `isReady()` was flipping false mid-block for
any normal-sized MSE block. The slow path parked the sender on a
Condition on essentially every send, which cost ~6x throughput in the
JMH bench versus the kill-switch-off baseline.

Widen the transport limits instead:

  * gRPC HTTP/2 stream window: default 64 MB, configured via
    `NettyServerBuilder.flowControlWindow(N)` in `GrpcMailboxServer`.
    Per HTTP/2 stream; the server advertises it as inbound credit so
    the sender knows how much it can pipeline.
  * Netty per-channel WriteBufferWaterMark: default 32 MB low /
    64 MB high, configured via
    `ChannelOption.WRITE_BUFFER_WATER_MARK` in `ChannelManager`. Per
    (host, port) channel, shared across all streams to that peer.

Three new config keys in `CommonConstants.MultiStageQueryRunner`:

  * pinot.query.runner.grpc.flow.control.window.bytes
  * pinot.query.runner.grpc.write.buffer.high.water.mark.bytes
  * pinot.query.runner.grpc.write.buffer.low.water.mark.bytes

Sender direct-memory footprint is now bounded by
`watermark.high × #peers` (= 64 MB × #peers with defaults), which is
well under the JVM direct-memory cap on a typical Pinot cluster. A
TODO in `MailboxService` flags that an additional application-level
global byte budget would tighten the bound to a single cap across all
peers — worth doing if we see fan-outs across hundreds of peers per
query, otherwise the transport-level cap is sufficient.

The existing `isReady()` gate stays as the chunk-level safety net (it
still defends against pathological multi-tens-of-MB blocks and against
the case where the receiver is severely backed up). The
`pinot.query.runner.grpc.sender.backpressure.enabled` kill-switch is
unchanged — it still flips `awaitReady` off as today.

Test:

  * `GrpcSenderBackpressureTest` recalibrated. With the wider pipeline
    the sender pace runs ~2240x the receiver poll rate (was ~35x); the
    threshold is now generously `polledCount * 7000`. Client-pool
    growth holds at 8 MB (down a hair from 12 MB).
  * `GrpcSenderBackpressureDisabledTest` unchanged: kill-switch path
    still races to a 5400x ratio.
  * `ChannelManagerTest` updated to pass the two new watermark args.
  * `BenchmarkGrpcMailboxSend` adds a `_flowControlWindowBytes`
    `@Param` so we can A/B across the old tiny default, the BDP-tuned
    1 MB, and the new 64 MB defaults.

All 39 mailbox-related tests pass.
…redit

gRPC's default auto-inbound-flow-control issues `request(1)` AFTER each
`MailboxContentObserver.onNext` returns. That ties the sender's HTTP/2
window replenishment to the receiver's per-message processing time —
including the offerData lock acquisition inside the application queue.
Even with a wide 64 MB HTTP/2 stream window, only one inbound message
is in flight at a time on a stream, and the bench showed the sender
parked in `awaitReady` on most sends because of this.

Switch the receiver to manual inbound flow control:

  * `GrpcMailboxServer.open` casts `responseObserver` to
    `ServerCallStreamObserver`, calls `disableAutoInboundFlowControl`,
    and `request(N)` once at stream open. N is configurable via the new
    `pinot.query.runner.grpc.inbound.message.credit` key (default 128).
  * `MailboxContentObserver.onNext` calls `request(1)` at the very
    start of the method, before any work that might block (offerRaw /
    lock acquisition). Keeps the credit window at N regardless of how
    long the application takes to process the message.

Receiver memory exposure is still bounded by the existing HTTP/2 stream
window (default 64 MB) — whichever of `N × max-msg-size` and the byte
window is smaller. For small payloads N binds (negligible memory); for
large payloads the byte window binds (same cap as today).

`Preconditions.checkArgument` guards against operators setting the
credit to 0 or negative (which gRPC would otherwise reject with an
opaque `IllegalArgumentException` at first-stream-open time).

Tests: all 42 mailbox tests pass. `GrpcSenderBackpressureDisabledTest`
ratio rose from ~5440x to ~6450x because the now-larger inbound
credit lets the kill-switch path push even more before transport
back-pressure engages — that's expected and within the existing
`> 10x` assertion. `GrpcSenderBackpressureTest` ratio is unchanged
from its post-transport-widening number; recalibration of its
threshold is left for the next bench run.
Replace the hot-loop "send one block per @benchmark op" measurement with
a request-shaped probe: each @benchmark invocation ships TOTAL_BYTES
(128 MiB) of pre-computed `RowHeapDataBlock`s and waits on a
`CountDownLatch` for the receiver-side drain to complete. Closer to the
unit Pinot actually cares about (one query's data movement) than
throughput of a single tiny block measured in a hot loop.

Axes:
  * `_blockSizeBytes` ∈ {8 KiB, 8 MiB, 32 MiB} — replaces the old
    `_payloadBytes`. The three sizes span "tiny" (16384 blocks/req),
    "one gRPC chunk" (16 blocks/req), and "split across ~4 chunks by
    `toByteStrings`" (4 blocks/req).
  * `_backpressureEnabled` — unchanged.
  * `_flowControlWindowBytes` — unchanged.
  * `_drainSleepMicros` removed — drainer is now blocking on a Reader
    callback, so a sleep axis no longer makes sense.

Drainer reform:
  * Receiver registers its `Reader::blockReadyToRead` callback to
    release a `Semaphore`; the drainer thread blocks on
    `Semaphore.acquire` and then non-blocking-polls everything
    available. No `Thread.onSpinWait` busy loop.
  * The drainer counts down a per-invocation `CountDownLatch` so the
    @benchmark thread can synchronise on "every block reached the
    receiver mailbox" before returning.

Mode → `AverageTime` / `MILLISECONDS`, warmup 2×5s, measurement 3×10s.
@gortiz gortiz added the metrics Related to metrics emission and collection label May 19, 2026
The class-level comment was using the Markdown javadoc syntax (`///`)
but kept the old HTML annotations (`<h2>`, `<em>`) and `{@code}` /
`{@link}` tags from the original Javadoc style. Inside a `///` comment
those HTML tags render as literal `<h2>` text in JDK 25's Markdown
renderer.

Replace:
  * `<h2>title</h2>`  →  `## title`
  * `<em>x</em>`      →  `*x*`
  * `{@code x}`       →  `` `x` ``
  * `{@link Foo#bar}` →  `[Foo#bar]`

Also fixes a stale reference: `GrpcSenderBackpressureReproTest` (now
`GrpcSenderBackpressureTest` after the rename in 1f0e802).
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second-pass review after the round-1 inline comments were addressed and two new architectural commits (e1c8459a60 widen-pipeline, dfce8a42cb manual inbound flow control) were pushed.

Round-1 verdicts (re-verified against the latest code): M2 partially-fixed (EOS race closed, data-path race narrowed but still reachable — comment below), M3/M5/M7/M8/M9 confirmed-fixed.

Round-2 focus is whether the new transport defaults still solve the direct-memory OOM the PR was built to fix. The mechanism is now layered: Netty WriteBufferWaterMark on the sender + HTTP/2 flow-control window on the receiver + manual inbound credit with eager replenishment + the original awaitReady application gate as a chunk-level safety net. The combination is correct and bounded, but the defaults are generous enough that on a moderate fan-out (~50 peers) the per-process bound approaches typical -XX:MaxDirectMemorySize ceilings, and the regression test no longer exercises the application gate at default config. Comments below address that plus a couple of related behavioral regressions (data-path race, cancel propagation latency).

Nice work on the rewrite — the bench/test changes and the @VisibleForTesting annotations are exactly what was needed.

// transport-layer WriteBufferWaterMark already caps sender direct memory per (host, port)
// channel, so the OOM the original PR fixed is bounded by watermark.high × #peers. A global
// byte budget would tighten the bound to a single configurable cap across all peers, but is
// unnecessary unless fan-outs hit hundreds of peers per query.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TODO downplays the risk this PR was built to address. Pinot deployments with 50–100 servers are common, and the bound here is writeBufferHighWaterMark × #peers on the sender plus flowControlWindow × #incoming_streams on the receiver. With the 64 MB defaults that's ~3.2 GiB/sender at 50 peers, ~6.4 GiB/sender at 100 peers, and a symmetric amount per concurrent inbound stream on the receiver — multiple concurrent queries multiply this. That's well within reach of the original failure mode on a moderate fan-out, not "hundreds of peers per query".

Two concrete asks:

  1. Convert this to a tracked issue with a quantitative trigger condition, so it doesn't rot as a comment.
  2. Add an operator-facing scaling formula to CommonConstants.MultiStageQueryRunner Javadoc: peak_sender_direct_memory ≈ writeBufferHighWaterMark × #peers, peak_receiver_direct_memory ≈ flowControlWindow × #incoming_streams. Without that, operators have no way to size watermark vs -XX:MaxDirectMemorySize for their topology, and the defaults will deceive.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I originally included this on the PR, but it just made it more complex. It is also difficult to trace total in-flight memory, as the natural way to track this is to increase a counter when a block is sent and reduce it when the receiver sends a status message, but that means we measure at block level, not chunk/message level.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created #18539. Anyway, note that multiplying these numbers by the number of concurrent opchains and servers is unrealistic.

/// before signalling back-pressure. As a result, [GrpcSendingMailbox.awaitReady] rarely fires the
/// application-level gate during a 3-second run; the back-pressure that *does* apply is transport-level
/// (the kernel's TCP send buffer and the receiver's gRPC server read loop). This means the ratio of
/// `sendCount` to `polledCount` is much larger than with the old narrow-window defaults, and the test
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The honest admission here ("awaitReady rarely fires the application-level gate") is correct — and it's also why this test no longer guards what it was built to guard. With the new transport defaults a 128-byte payload at 50 polls/sec for 3 seconds simply cannot fill a 64 MB sender write-buffer or a 64 MB HTTP/2 stream window, so the application gate never engages. The 140× relaxation (polledCount * 50 + 10_000polledCount * 7000 at line 210) means a future refactor that deletes the entire setOnReadyHandler + ClientResponseObserver.beforeStart + awaitReady machinery would still pass this test, because the kernel TCP send buffer alone bounds the ratio well under 7000× at this payload size.

Suggest a companion GrpcSenderBackpressureTightGateTest that overrides the new transport keys to small values via PinotConfiguration:

Map.of(
  "pinot.query.runner.grpc.flow.control.window.bytes", "65535",
  "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes", "262144",
  "pinot.query.runner.grpc.write.buffer.low.water.mark.bytes", "131072"
)

with the original tight assertion (polledCount * 50 + 10_000). That makes the application gate the dominant back-pressure mechanism, so the test fails if awaitReady regresses. GrpcSenderBackpressureDisabledTest already exercises the kill-switch-off case; this would complete the matrix.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in commit 2d8fe69320 as GrpcSenderBackpressureTightGateTest, with exactly the config matrix you proposed:

Map.of(
    KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 65535,
    KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES, 262144,
    KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES, 131072,
    // Y3 fail-fast validation requires window >= maxInboundMessageSize, so shrink that too:
    KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, 65535)

Tight assertion: sendCount <= polledCount * 50 + 10_000 (the original bound from before the wide-defaults relaxation). With narrow transport caps a deletion of the application gate would fail this loudly. Class-level Javadoc references the three-test matrix (GrpcSenderBackpressureTest — wide-defaults regime, GrpcSenderBackpressureDisabledTest — kill-switch off, this one — narrow transport with the gate as the dominant back-pressure mechanism).

The KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES override was added in commit 1d29438dc0 ("Fail fast on invalid gRPC mailbox transport configuration") — the Y3 startup-time check I added (flowControlWindow >= maxInboundMessageSize) immediately caught this test setup as misconfigured on first run; shrinking both together to 65 535 satisfies it.

_maxInboundMessageSize = maxInboundMessageSize;
_idleTimeout = idleTimeout;
_writeBufferHighWaterMarkBytes = writeBufferHighWaterMarkBytes;
_writeBufferLowWaterMarkBytes = writeBufferLowWaterMarkBytes;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two validation gaps that fail lazily instead of at startup:

  1. WriteBufferWaterMark(low, high) throws IllegalArgumentException if low > high, but only inside getChannel on the first send to a previously-unseen peer (lines 103-104, 116-117). A misconfigured cluster starts up fine, exposes metrics, and then explodes mid-query.
  2. No positivity check; a 0 or negative watermark would be accepted by Netty as effectively always-writable / always-unwritable — not what operators expect.

Worth adding Preconditions.checkArgument(0 < writeBufferLowWaterMarkBytes && writeBufferLowWaterMarkBytes <= writeBufferHighWaterMarkBytes, ...) here, matching the existing pattern at GrpcMailboxServer.java:149-152 for _inboundMessageCredit. Same shape applies to flowControlWindow >= maxInboundMessageSize in GrpcMailboxServer — a window smaller than the max message size makes the stream unusable, and there's currently no check.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 1d29438dc0. Two validations added:

  1. ChannelManager: eagerly constructs the WriteBufferWaterMark in the constructor (so Netty's own 0 <= low <= high check fires at startup, not on first getChannel), plus an explicit Preconditions.checkArgument(writeBufferLowWaterMarkBytes > 0, …) for the low = 0 degenerate case Netty accepts. The two int fields collapsed into a single _writeBufferWaterMark field reused across all peers.

  2. GrpcMailboxServer: Preconditions.checkArgument(_flowControlWindowBytes >= maxInboundMessageSize, …), sitting next to the existing _inboundMessageCredit > 0 check.

Both match the precondition-with-config-key-name error-message style used for _inboundMessageCredit.

Side effect: the new validation caught GrpcSenderBackpressureTightGateTest on first run — it was overriding flowControlWindow to 65535 without also shrinking maxInboundMessageSize (default 16 MiB). The test now shrinks both together so the application gate, not the new validation, is what bounds the sender. The fact that the validation flagged a real (test-time) misconfiguration on first run is reassuring — exactly what fail-fast is for.

// offerData lock acquisition inside _mailbox.offerRaw). This decouples the sender's HTTP/2 window
// replenishment from the receiver's per-message processing time — gRPC will issue the WINDOW_UPDATE
// for this message as soon as this request(1) call sets the credit, not waiting for onNext to return.
_responseObserver.request(1);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subtle but worth surfacing: this credit replenishment runs before the blocking _mailbox.offerRaw at line 85. The comment explains the intent (decouple HTTP/2 window from per-message processing time) and the throughput rationale is reasonable, but architecturally this means the receiver replenishes credit independently of application drain rate. If the downstream ReceivingMailbox queue (cap 5) fills and the dispatch thread parks in _notFull.await, the sender keeps shipping bytes into the receiver's Netty inbound buffer up to the 64 MB stream window — i.e. receiver-side direct memory pinned per stalled stream is bounded by flowControlWindow, not by application progress.

That is the intentional design and it works as long as flowControlWindow × concurrent_streams stays well below -XX:MaxDirectMemorySize. Two suggestions:

  1. Add // Do not move this below the blocking call so a future maintainer doesn't "fix" the apparent ordering oddity and re-couple credits to drain rate.
  2. Spell out in the KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES Javadoc that this value is the per-stalled-stream receiver-side direct-memory exposure, not just a throughput knob — so operators sizing it understand the OOM-bound implication.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in commit b5d19714ae. Above the _responseObserver.request(1) call there's now a blunt warning:

// Do not move this below the blocking _mailbox.offerRaw call — it must replenish credit
// BEFORE the offer so the receiver doesn't gate the sender on per-message application drain time.
_responseObserver.request(1);

And in CommonConstants.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES Javadoc I spelled out that this value is the per-stalled-stream receiver-side direct-memory exposure (not just a throughput knob): when an inbound stream's application queue stalls, the wire can still buffer up to flowControlWindow bytes on that stream before the HTTP/2 peer stops sending. Operators sizing the window have the OOM-bound implication called out explicitly.

// does not fully eliminate) that window; fully eliminating it would require serializing all onNext() calls
// under _readyLock, which is more invasive. The bypass path (cancel/close) must push through regardless,
// so this guard only applies when bypassReady == false.
if (!bypassReady && isTerminated()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Round-1 review flagged this race; the EOS path was correctly closed in commit 573356ee74 by reordering _senderSideClosed = true before onCompleted(). This data-path mitigation is narrower — the isTerminated() re-check reduces but doesn't eliminate the window where two threads can call onNext on the same non-thread-safe ClientCallStreamObserver. Under heavy cancel-while-sending workloads the race surfaces as either an IllegalStateException from gRPC ("call already closed") or two concurrent onNext calls on a stream gRPC does not contractually promise to be thread-safe.

The clean fix is to hold _readyLock across awaitReady() + onNext(). The fast path of awaitReady is already lock-free (returns on isReady() outside the lock), so the extra acquire is paid only when the gate would have parked anyway — negligible. Alternative: keep the current approach but elevate this paragraph to class-level Javadoc so it's not lost in the middle of sendContent.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c39e12f1a4 by going with your "clean fix" — _readyLock (the existing ReentrantLock already used for _readyCond) now serializes every outbound observer call. Three sites are now lock-protected:

  1. sendContent: lock spans the post-awaitReady isTerminated() re-check and the onNext. awaitReady itself stays outside the lock — its slow path takes _readyLock internally for _readyCond.await, so wrapping it would just nest. The fast isReady() == true path remains lock-free.
  2. send(MseBlock.Eos, …): lock spans _senderSideClosed = true + onCompleted so the success-path close is atomic against any racing sendContent.
  3. cancel: lock spans processAndSend(errorBlock, bypassReady=true) + onCompleted. The inner sendContent re-acquires via ReentrantLock's same-thread reentry, no-op cost.

The previous in-code paragraph that explained the narrow-window mitigation is gone — the race is no longer narrow, it's closed. Class-level Javadoc gained a one-liner: "_readyLock serializes every call to _contentObserver.onNext / onCompleted. Acquire it around any new outbound call site you add." so future maintainers don't accidentally add an unprotected fourth site.

Rationale for picking the full fix over the "elevate the comment to class-level Javadoc and file a follow-up" alternative: this PR already paid the equivalent cost in 573356ee74 for the symmetric cancel-vs-EOS race. Leaving the data-path variant behind a re-check that visibly doesn't close it (and writing that fact down) felt like the wrong invariant to ship. The cost of the lock is one uncontended lock()/unlock() per chunk — tens of ns against onNext, no measurable bench impact.

Tests: 40/40 mailbox tests still pass, including testRemoteCancelledBySender* in GrpcSendingMailboxTest (which is exactly the cancel-vs-send race scenario).

ServerCallStreamObserver<Mailbox.MailboxStatus> serverCallObserver =
(ServerCallStreamObserver<Mailbox.MailboxStatus>) responseObserver;
serverCallObserver.disableAutoInboundFlowControl();
serverCallObserver.request(_inboundMessageCredit);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Behavioral regression worth documenting (and ideally testing): with auto inbound flow control the receiver saw one in-flight message at a time. With this change the in-flight window per stream becomes min(_inboundMessageCredit messages, flowControlWindow bytes)min(128, 64 MB) at defaults. That widens cancel-propagation latency proportionally: when the sender calls cancel() and pushes an error EOS (bypassReady=true), the EOS is appended to the same HTTP/2 stream behind any messages already in the inbound buffer. The receiver dispatches each through MailboxContentObserver.onNext before observing the cancel, and if the application queue is full each dispatch parks in _notFull.await. A cancelled query can take noticeably longer to actually stop while up to 64 MB of buffered data is drained.

Two recommendations:

  1. Register setOnCancelHandler on the ServerCallStreamObserver here. That gives the server an out-of-band signal to call _mailbox.cancel() and wake the parked dispatch thread immediately, independent of the stream-ordered EOS.
  2. Add a test that issues a cancel while the receiver is parked on a full queue and asserts the receiver mailbox terminates within a tight bound (e.g. < 100 ms), independent of how many buffered messages remain on the stream.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two-part response landed in acdd1e3266:

1. Rollback knob (pinot.query.runner.grpc.manual.inbound.flow.control.enabled, default true).
Lets operators flip the receiver back to gRPC's auto-inbound-flow-control (1 in-flight message at a time, no disableAutoInboundFlowControl() call, no manual request(1) in onNext) without rebuilding. The rollback path matches the pre-PR receiver behaviour verbatim and is exercised in MailboxContentObserverTest (new testOnNextDoesNotReplenishCreditWhenManualFlowControlDisabled asserts request(anyInt()) is never called when the flag is false). This shrinks the worst-case cancel-latency window back to where it was before this PR, at the cost of the throughput improvement the manual flow control buys.

2. GH issue #18541MSE: out-of-band cancel propagation for stuck receivers.
Documents the underlying hang (in-band EOS gated by a parked dispatch thread on a full application queue) and explicitly notes it's pre-existing — the rollback path above doesn't fix it either, just shrinks the window. Two design options written up in the issue:

  • Option 1: setOnCancelHandler + sender stream.cancel(). Faster, but the in-band error EOS can be dropped → loss of specific QueryErrorCode on the receiver.
  • Option 2 (preferred): dedicated Mailbox.Cancel(mailboxId, errorPayload) unary RPC. Out-of-band, preserves error code fidelity, slightly more work (proto + handler + sender wiring). This is the route the user suggested when we discussed.

Also included: a concrete test recipe (cancel-while-receiver-parked-on-full-queue, assert termination within ~100 ms) that would fail today and pass after either option.

Going with documentation + rollback + issue in this PR rather than implementing the out-of-band cancel here: the proper fix is a non-trivial behavioural change to the cancel contract, and the issue Yash flagged is bounded (the rollback gives operators a release valve, and the hang it would re-expose predates this PR). Issue #18541 carries the design for the proper fix as a focused follow-up.

KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT Javadoc now spells out the cancel-latency tradeoff (min(credit messages, flowControlWindow bytes) of buffered inbound to drain before the EOS reaches the application) and links #18541. GrpcSendingMailbox.cancel(Throwable) Javadoc carries the same link.

Tests: 41/41 mailbox tests still pass, including the new manual-flow-disabled test.

Copy link
Copy Markdown
Contributor

@Jackie-Jiang Jackie-Jiang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

We are introducing a lot of new configs. What is the guideline for them? Do you foresee tuning each of them individually?

*/
public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
"pinot.query.runner.grpc.flow.control.window.bytes";
public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024 * 1024;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we align this with pinot.query.runner.max.msg.size.bytes (16MB by default)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we shouldn't. Using the same value as the default would mean that when large blocks are sent, a single message can be in-flight, which would penalize the performance. Benchmarks clearly show that 64MB is better for large messages than smaller values. In fact, larger values could produce a better throughput (at the cost of making more likely to get out of direct memory).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be clear: This is the buffer the sender has to send bytes until the receiver acks them. So it marks how many bytes we may have in-flight before blocking the sender. Having a larger value than pinot.query.runner.max.msg.size.bytes means that we can have more than 1 message in-flight, which is desired. The more messages we can have in-flight, the closer we are to the previous state, where the sender never blocks but keeps buffering off-heap memory. This increases performance at the cost of stability.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better explaination from Claude:

  • flowControlWindow (64 MB): per-stream HTTP/2 inbound byte window the receiver advertises. Caps bytes in flight per stream
    before the sender has to wait for WINDOW_UPDATE.
  • writeBufferHighWaterMark (64 MB): per-channel Netty outbound queue size on the sender. Once the queue grows past this,
    Channel.isWritable() flips false and the application gate parks.
  • writeBufferLowWaterMark (32 MB): the drain threshold that flips isWritable() back to true.

flowControlWindow and writeBufferHighWaterMark are aligned at 64 MB by design — they cap roughly the same conceptual thing (one peer's worth of in-flight bytes) from the two ends of the wire. We could collapse them into a single config, but I prefer keeping them separate so operators can tune them independently if their workload calls for it (e.g. an asymmetric one-way fan-in).

writeBufferLowWaterMark intentionally is not equal to high — they're a hysteresis pair, not a single threshold. Netty marks the channel unwritable when the outbound queue grows above high, and writable again only when it drains below low. If we set them equal, the channel would oscillate between writable/unwritable on every byte once the queue is around that size, forcing constant park/wake on the application gate under sustained pressure. Half-of-high is the standard hysteresis ratio.

Comment thread pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java Outdated
*/
public static final String KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT =
"pinot.query.runner.grpc.inbound.message.credit";
public static final int DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT = 128;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean we allow 128 gRPC messages to queue up on the sender side? How is this 128 calculated? This is way higher than the default blocking queue size, which is 5

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this means exactly that.

The max number of queued up messages will be 128 + the 5 we have in the blocking queue, but:

  • The ones in the blocking queue are heap allocated, while the ones in gRPC should be off-heap (depending on the gRPC implementation)
  • Their total size in bytes is still limited by the window size (DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES).

This is useful for small blocks that are sent very rapidly (which is probably not a common case in Pinot). Before this change, the number of messages in-flight was limited to 1. This wasn't that problematic because we buffered indefinitely on the sender side, but with this feature, it blocks the sender (the window isn't considered ready).

/// Notice we are wiring the shaded gRPC Netty allocator
/// ([io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator]) rather than
/// the non-shaded one.
private void registerMailboxClientGauges() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to also emit the server side memory usage?

Copy link
Copy Markdown
Contributor Author

@gortiz gortiz May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have that in MAILBOX_SERVER_USED_*, registered in GrpcMailboxServer. Probably worth following up on whether gauges give us enough spike fidelity — scrape-interval polling will miss short-lived bursts. A histogram or a max-since-last-scrape gauge would catch those, but either way means churn on existing metric names and neither of these concepts is supported by our PinotMetric abstraction

gortiz added 6 commits May 20, 2026 09:41
…g Javadocs

Reviewer Y1: the three new gRPC mailbox transport-tuning keys were documented
as throughput knobs without making the direct-memory bound explicit. Operators
sizing -XX:MaxDirectMemorySize need the per-knob formula stated, not just the
qualitative "scales with peers / streams" language.

For each of the three keys add an operator-facing scaling formula:

- KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES
  Peak receiver direct memory ≈ flowControlWindow × #concurrent_incoming_streams.

- KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES
  Peak sender direct memory ≈ writeBufferHighWaterMark × #peers
  (one channel per peer, shared across streams to that peer).

- KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES
  References the high-water mark's formula and explains the (high − low) gap
  is drain hysteresis (does not change the peak bound; controls reopen rate).

Also state explicitly on each that these are direct-memory bounds, not just
throughput knobs, and must be sized against -XX:MaxDirectMemorySize.
…t hard to miss

Reviewer Y4: the existing comment above _responseObserver.request(1) in
MailboxContentObserver.onNext explains the *why* of replenishing credit
before the blocking _mailbox.offerRaw, but a future maintainer "fixing"
the apparent ordering oddity would re-couple credit replenishment to the
receiver's per-message application drain time and silently re-gate the
sender on downstream operator speed.

Add a short, blunt warning immediately above the request(1) call making
the ordering an explicit do-not-move directive, so the load-bearing
constraint is impossible to miss on a casual reorder.

Also in CommonConstants, in the KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES
Javadoc, spell out that the flow-control window is the per-stalled-stream
receiver-side direct-memory exposure, not just a throughput knob: when an
inbound stream's receiver application queue stalls, the wire can still
buffer up to flowControlWindow bytes of data on that stream before the
peer stops sending. Operators sizing the window need to understand this
OOM implication, not just the throughput / round-trip implication.
…down

Reviewer J2: the four new gRPC mailbox back-pressure / transport config keys
use the legacy /** ... */ HTML Javadoc style with `<p>` tags, while other
recent additions in CommonConstants (e.g. CONFIG_OF_SORT_EXCHANGE_COPY_THRESHOLD,
CONFIG_OF_GROUP_BY_FLUSH_THRESHOLD_FOR_STREAMING) use the Java 25 `///`
markdown style.

Convert the four keys to `///`:

  - KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED
  - KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES
  - KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES
  - KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES
  - KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT

Use markdown paragraph breaks (blank `///`), backticks for inline code, and
`[ClassName#methodName]` for cross-references. Cross-module references are
written fully qualified ([io.grpc.stub.ClientCallStreamObserver#isReady],
[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]) to
match the existing FQN convention in this file. Replaces `{@code ...}` with
backticks and `<b>...</b>` with `**...**` per markdown rules.

No semantic changes; the Y1 scaling formulas and the Y4 per-stalled-stream
sentence are preserved verbatim in the converted form.
…pache#18539

Reviewer comment (3269229763 on MailboxService.java:217) flagged that the
existing TODO understated the residual direct-memory risk: it claimed a
global byte budget was "unnecessary unless fan-outs hit hundreds of peers".

At the 64 MiB transport-layer defaults this PR ships, the per-sender bound
is watermark.high × #peers × #concurrent_queries, which reaches ~3.2-6.4 GiB
per sender at 50-100 peer fan-out — well within the failure envelope of the
original OutOfDirectMemoryError this PR was supposed to bound.

Replace the TODO with a one-liner linking to apache#18539, which
tracks adding a configurable global byte budget across peers as the fourth
back-pressure layer (above the application gate, transport tuning, and
manual receiver-side inbound flow control already in this PR).
…gate

A reviewer pointed out that GrpcSenderBackpressureTest, after the move to the
production-wide 64 MiB transport defaults, no longer fails when the entire
application-level awaitReady() machinery is removed: transport-level
back-pressure alone bounds the sendCount/polledCount ratio well under the
relaxed `polledCount * 7000` threshold at the test's 128-byte payload.

GrpcSenderBackpressureTightGateTest closes that gap. It mirrors the existing
test's scaffolding (two MailboxServices on localhost, slow ~50 polls/sec
reader, fast sender reusing a small RowHeapDataBlock) but overrides the three
new transport configs to the minimum values the stack accepts:

  * grpc.flow.control.window.bytes       = 65535
  * grpc.write.buffer.high.water.mark    = 256 KiB
  * grpc.write.buffer.low.water.mark     = 128 KiB

At those sizes the wire layer cannot absorb the sender's burst, so the
application-level gate in GrpcSendingMailbox.awaitReady is the dominant
back-pressure mechanism. The test asserts the original tight bound
`sendCount <= polledCount * 50 + 10_000` — a bound that is only achievable
if the gate actually engages and parks the sender on the Condition. Removing
the `bypassReady || !_backpressureEnabled` short-circuit or the Condition
wait would let sendCount run away to thousands of times the poll rate and
trip this assertion.

The Javadoc explains the matrix:
  * GrpcSenderBackpressureTest         — wide defaults, gate + transport.
  * GrpcSenderBackpressureDisabledTest — wide defaults, kill-switch off.
  * GrpcSenderBackpressureTightGateTest — narrow transport, gate alone.
…iang)

GrpcSendingMailbox is not a public-facing class, so there is no need to
keep the original 7-arg constructor as a delegating overload. Collapse to
a single 8-arg constructor that takes `boolean backpressureEnabled` and
update the two callers (MailboxService already passes the flag; the
GrpcSendingMailboxTest unit test now passes `true`).
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal issue; see inline comment.

// does not fully eliminate) that window; fully eliminating it would require serializing all onNext() calls
// under _readyLock, which is more invasive. The bypass path (cancel/close) must push through regardless,
// so this guard only applies when bypassReady == false.
if (!bypassReady && isTerminated()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This post-awaitReady() guard still leaves ClientCallStreamObserver exposed to concurrent use. If cancel() or close() wins the race after this check but before the sender thread reaches _contentObserver.onNext(...), the cancel path can still call onNext()/onCompleted() on the same observer concurrently. The comment above already notes the window is only reduced, not eliminated, and gRPC does not guarantee these observers are thread-safe. This needs a fully serialized send/close path (or a single-thread handoff), otherwise the back-pressure fix still leaves a correctness race in cancellation handling.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — the narrow re-check on its own wasn't enough, and this was independently flagged by @yashmayya in #3269229790. The full fix landed in commit c39e12f1a4 ("Fix data race on _contentObserver by serializing all outbound calls under _readyLock"), after the diff this thread was posted against.

What's there now: _readyLock (the existing ReentrantLock that already guards _readyCond) serializes every call to _contentObserver.onNext / onCompleted, across all three call sites:

  • sendContent (GrpcSendingMailbox.java:372) — lock spans the post-awaitReady isTerminated() re-check and the onNext.
  • send(MseBlock.Eos, …) (:142) — lock spans _senderSideClosed = true + onCompleted so the success-path close is atomic against any racing sendContent.
  • cancel(Throwable) (:268) — lock spans processAndSend(errorBlock, bypassReady=true) + onCompleted. The inner sendContent reacquires the same ReentrantLock via same-thread reentry.

awaitReady itself stays outside the lock — its slow path acquires _readyLock internally to call _readyCond.await, and the fast isReady()-true path remains lock-free. The fast lock()/unlock() pair we pay per send is ~tens of ns vs. onNext cost, no measurable bench impact.

The "narrow-window mitigation" comment you were referring to has been removed; class-level Javadoc now carries the new invariant:

// _readyLock serializes every call to _contentObserver.onNext / onCompleted.
// Acquire it around any new outbound call site you add.

Tests in GrpcSendingMailboxTest (including testRemoteCancelledBySender*, which specifically exercises the cancel-vs-send race) still pass.

gortiz added 2 commits May 20, 2026 14:29
Two startup-time validations on the new transport-tuning configs added
by this PR. Without them, a misconfigured cluster boots cleanly and the
failure surfaces mid-query — at first send to a new peer, or as flapping
back-pressure once any message exceeds the receiver's window.

  * `ChannelManager`: validate `0 < writeBufferLowWaterMarkBytes` at
    construction and eagerly build the `WriteBufferWaterMark`. Netty's
    constructor already enforces `0 ≤ low ≤ high`, so eager
    construction surfaces inverted-or-negative misconfiguration at
    startup instead of on the first `getChannel(...)` call.
  * `GrpcMailboxServer`: validate
    `flowControlWindow ≥ maxInboundMessageSize`. A window smaller than
    the largest possible message makes the stream pathological — no
    single message can ever fit in the available credit and
    `isReady()` flaps continuously.

Matches the `Preconditions.checkArgument` pattern already in place for
`_inboundMessageCredit > 0` in `GrpcMailboxServer`, called out by
Yash's review on PR apache#18519.

`GrpcSenderBackpressureTightGateTest` needed a small adjustment: it was
overriding the flow-control window to 65 535 bytes for the test
scenario, which is below the default 16 MiB `maxInboundMessageSize`.
The test now shrinks both together (window and max-message both
65 535) so the application gate, not the new validation, is what
bounds the sender.
…nder _readyLock

The gRPC ClientCallStreamObserver stored in _contentObserver is not
thread-safe, but three call sites can invoke onNext / onCompleted on it
concurrently:

  * sendContent() (the data path) calls onNext on every chunk.
  * send(MseBlock.Eos) calls onCompleted on the success-EOS path.
  * cancel(Throwable) calls processAndSend() (-> onNext on an error EOS)
    followed by onCompleted().

Round-1 fix 573356e closed the cancel-vs-success-EOS variant by
ordering _senderSideClosed = true before _contentObserver.onCompleted().
A round-2 review (Yash, comment id 3269229790 on PR apache#18519) pointed out
that the symmetric data-vs-cancel variant is not closed by the
post-awaitReady() isTerminated() re-check in sendContent: the window
between that re-check and the subsequent onNext call still permits two
threads to drive the same non-thread-safe observer.

This change makes _readyLock the serialization point for every outbound
observer call. The lock is now held across:

  * sendContent: the isTerminated() re-check and the onNext() call.
  * send(MseBlock.Eos): the _senderSideClosed write and the
    onCompleted() call.
  * cancel(Throwable): the processAndSend() (which itself acquires the
    lock again via sendContent, nesting safely since _readyLock is a
    ReentrantLock) and the trailing onCompleted() call.

awaitReady() intentionally stays outside the lock at the sendContent
site. Its slow path already acquires _readyLock to wait on _readyCond,
and acquiring before awaitReady() would force the fast isReady() == true
path to take and release the lock for no benefit. Visibility holds
because the slow-path lock release happens-before the next acquisition.

Cost: one uncontended lock()/unlock() per chunk on the data path
(~tens of ns), negligible against onNext().
gortiz and others added 2 commits May 20, 2026 15:31
…trol

Adds a rollback knob, pinot.query.runner.grpc.manual.inbound.flow.control.enabled
(default true), that reverts the receiver to gRPC's auto-inbound-flow-control
(pre-PR-apache#18519 behaviour, 1 in-flight message). When false, GrpcMailboxServer
skips disableAutoInboundFlowControl() + the up-front request(N), and
MailboxContentObserver.onNext skips the per-message request(1) replenishment.

Also documents the cancel-propagation latency tradeoff on
KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT and GrpcSendingMailbox.cancel():
the in-band error EOS sits behind buffered inbound when the receiver's
dispatch thread is parked on a full application queue. This hang surface
is pre-existing (it also exists in the rollback path); the wider credit
window only magnifies it. Tracking the proper out-of-band cancel work in
apache#18541.

Addresses review comment 3269229796 on PR apache#18519.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…onfigs

`flowControlWindow` and `writeBufferHighWaterMark` cap roughly the same
conceptual thing (one peer's worth of in-flight bytes) from the two
ends of the wire, but each Javadoc described its piece in isolation.
An operator reading just one of them wouldn't realize the other is its
mirror on the opposite end and wouldn't see why they share a default.

Add a short paragraph to each that names the counterpart, explains the
"aligned at the same default by design" intent, and explains why we
kept them as separate keys (asymmetric tuning).

Documentation only — no behavior change.
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Third-pass review — validated the 10 follow-up commits since round 2. All round-2 inline comments were addressed cleanly: the data-path race is genuinely closed by serializing all three outbound observer call sites under _readyLock in c39e12f1a4 (verified there is no deadlock path — nothing that holds _readyLock ever invokes the slow path of awaitReady, so the reentrant-lock + Condition.await "releases only one level" footgun is dodged by construction; serialization runs outside the lock so the per-chunk cost stays at one uncontended lock pair). Startup validation, tight-gate test, kill-switch, TODO→issue link, scaling formulas all check out.

One real bug found — inline comment below.

throws IOException {
PinotConfiguration cfg = new PinotConfiguration(Map.of(
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, _backpressureEnabled,
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, _flowControlWindowBytes));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This benchmark is broken by the new fail-fast validation in 1d29438dc0.

_flowControlWindowBytes is swept over {65535, 1048576, 67108864} (line 123) but the cfg map here only overrides KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES. maxInboundMessageSize stays at the 16 MiB default (DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES). The new Preconditions.checkArgument(_flowControlWindowBytes >= maxInboundMessageSize, ...) at GrpcMailboxServer.java:163 then rejects two of the three axis values at MailboxService.start():

  • 65535 < 16 MiB → IllegalArgumentException
  • 1048576 < 16 MiB → IllegalArgumentException
  • 67108864 (64 MiB) ≥ 16 MiB → passes

So 16 of the 24 @Param configurations (2 × 4 × 2 = 16 with broken window values, out of 2 × 4 × 3 = 24 total) blow up in @Setup instead of running. The whole point of the benchmark — comparing the gate's behavior across the three flow-control-window regimes — collapses to a single regime.

Fix is one line — mirror what GrpcSenderBackpressureTightGateTest.java:118 already does:

PinotConfiguration cfg = new PinotConfiguration(Map.of(
    CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, _backpressureEnabled,
    CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, _flowControlWindowBytes,
    CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
        Math.min(_flowControlWindowBytes, CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)));

(The Math.min keeps the 64 MiB axis using the production-default 16 MiB max-message-size, which is what an operator would actually run; the two smaller axes pin maxInboundMessageSize down to the window so the validation passes.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6116e5442d. Same Math.min(window, default) clamp you proposed, applied to the bench's PinotConfiguration map.

You're right that I should have caught this when I added the validation — GrpcSenderBackpressureTightGateTest (which I also wrote) was already using exactly this clamp pattern, but I missed updating the bench in the same change. The bench's _flowControlWindowBytes axis is back to running all three cells (64 KiB / 1 MiB / 64 MiB).

Small correction to the description: the post-rewrite bench has 3 axes (_blockSizeBytes × 3, _backpressureEnabled × 2, _flowControlWindowBytes × 3 = 18 cells, not 24 — the old _payloadBytes × 4 axis was dropped when the bench moved to a request-shape probe in a597d63a64). But the conclusion was right — 12 of 18 cells were blowing up at @Setup.

Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal issue; see inline comment.

@@ -125,7 +169,7 @@ private boolean sendInternal(MseBlock block, List<DataBuffer> serializedStats) {
_contentObserver = getContentObserver();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This lazy initialization is still racy with cancel(): both paths do a plain _contentObserver == null check and can each open their own gRPC stream for the same mailbox. If that happens, the cancel EOS / onCompleted() can run on a different ClientCallStreamObserver than the data path uses, so the receiver can miss the cancellation or see two concurrent streams for one mailbox. Please guard observer creation under _readyLock (or another one-time init primitive) so sender and cancel share a single stream.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 04eb98577c. The race was real — I landed a regression test (concurrentSendAndCancelInitializeContentObserverExactlyOnce) that reproduces it: sender thread + cancel thread driven through CyclicBarrier(2), asserts exactly one observer is created across 200 iterations; fires immediately on iteration 0 against unfixed code (counter sees 2), passes deterministically with the fix.

Fix uses the standard double-checked-lock idiom against the existing _readyLock:

private void ensureContentObserverInitialized() {
  if (_contentObserver != null) {
    return;                          // fast path, volatile read
  }
  _readyLock.lock();
  try {
    if (_contentObserver == null) {
      _contentObserver = getContentObserver();
    }
  } finally {
    _readyLock.unlock();
  }
}

Both bare if (_contentObserver == null) blocks (in sendInternal and cancel) now route through this helper. Kept the lazy init rather than eagerly initializing in the constructor — cancel-before-first-send must still open a stream so the receiver gets an explicit error EOS, so the lazy-init-on-cancel comment in the cancel path is load-bearing.

Your thread-safety prompt also pointed the way to a symmetric race we surfaced in a pre-push strict review: send(Eos) racing cancel's onCompleted on the half-close edge — both pass their top-of-method isTerminated() check, cancel wins the lock, calls onCompleted, and the EOS path's onNext / onCompleted then throws IllegalStateException from gRPC. Fixed in 840c5640ea with a defensive try/catch around the EOS path (matching the existing pattern in cancel) plus a second regression test that races send(Eos) and cancel and asserts neither throws. Plus a half-open-stream leak in close() fixed in aeaacc893d for completeness. Both worth flagging since they're in the same observer-lifecycle area you were looking at.

gortiz and others added 5 commits May 21, 2026 18:07
…ow-control window

The fail-fast validation added in 1d29438 ("Fail fast on invalid gRPC mailbox
transport configuration") asserts `flowControlWindow >= maxInboundMessageSize`.
BenchmarkGrpcMailboxSend sweeps `_flowControlWindowBytes` over
{65535, 1048576, 67108864} but left max-msg-size at the 16 MiB default, so two of
the three axis values fail the new validation at @setup and 12 of 18 benchmark
cells never run.

Mirror what GrpcSenderBackpressureTightGateTest already does: also override
KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, clamped via Math.min to the
window. The 64 MiB-window cell still uses the production-default 16 MiB
max-msg-size; the two narrow-window cells pin max-msg-size down to the window
so the validation passes.

Bench compile verified; full bench not run (~15 min).

Reported by yashmayya (review 3277728114).
Two call sites in GrpcSendingMailbox were doing an unprotected check-then-act
on _contentObserver:

* sendInternal: if (_contentObserver == null) { _contentObserver = getContentObserver(); }
* cancel:      if (_contentObserver == null) { _contentObserver = getContentObserver(); }

_contentObserver is volatile, so individual reads/writes are atomic, but two
threads (e.g. the sender thread on its first send() racing an external cancel
from an OpChain on-failure callback) could both observe null and each call
getContentObserver(), opening two gRPC streams for the same mailbox id.

The loser stream is leaked — never reaches onCompleted, only reclaimed by the
gRPC deadline. Worse, the cancel EOS can land on a different stream than the
data, silently losing the cancellation signal on the receiver side.

The Y5 _readyLock work does not cover this — both call sites do the lazy init
BEFORE acquiring the lock.

Fix: funnel both call sites through ensureContentObserverInitialized(), which
uses the standard double-checked-lock idiom under the existing _readyLock.

Eager init in the constructor is NOT acceptable: it would change the
cancel-before-first-send semantics — a mailbox that is cancelled before any
send() should open a stream only at cancel time so the receiver gets an
explicit cancel-error EOS instead of waiting for its own deadline. Lazy init
must remain; only the synchronization is being fixed.

Adds a regression test concurrentSendAndCancelInitializeContentObserverExactlyOnce
that uses a CountingGrpcSendingMailbox subclass to count getContentObserver()
calls and races send(data) + cancel(throwable) from two threads synchronized on
a CyclicBarrier across 200 iterations. Verified against the unfixed code: the
race fires on iteration 0 (two streams opened). Deterministically passes after
the fix (5/5 runs).

Reported by xiangfu0 (review 3281051013).
…ception

Symmetric to the data-path race fixed in c39e12f, but on the closing
edge of the gRPC stream. When `send(MseBlock.Eos, ...)` and `cancel(...)`
race past the top-of-method `isTerminated()` checks, both proceed into
their lock sections. Cancel can push its error EOS payload + `onCompleted`
first, half-closing the gRPC stream; the EOS path then calls
`_contentObserver.onNext` (for the EOS payload, since `sendContent`'s
in-lock re-check is skipped under `bypassReady=true`) or
`_contentObserver.onCompleted` (the outer half-close in `send(Eos)`) on
the already half-closed stream and raises `IllegalStateException("call
already half-closed")`, which `sendInternal` does not catch
(it only catches `IOException`).

Fix: wrap the EOS path's `sendInternal` call and its subsequent
`onCompleted` half-close in `try/catch(IllegalStateException)` and treat
the duplicate half-close as benign — the racing cancel has already
delivered an EOS to the receiver. Symmetric to the existing
`catch(Exception)` block in `cancel()`. Option B from the review:
narrower than coupling `sendContent`'s bypass-mode check to
`_statusObserver.isFinished()`, and easier to read.

Adds a regression test `concurrentSendEosAndCancelDoesNotThrow` to
`GrpcSendingMailboxTest`: drives `send(SuccessMseBlock.INSTANCE)` and
`cancel(...)` through a `CyclicBarrier(2)` over 200 iterations with a
stub observer that enforces the real gRPC half-close contract
(subsequent `onNext` / `onCompleted` after a `onCompleted` raise
`IllegalStateException`). Verified the race fires on iteration 1
against the unfixed code; deterministic across multiple runs after
the fix.

Surfaced by the review-concurrency-state pre-push skill on PR apache#18519.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The "should-not-happen" close() fallback path was pushing the internal-
error EOS payload but never calling `_contentObserver.onCompleted()`, so
the client side of the gRPC stream stayed half-open until the per-call
deadline fired. For long-running MSE queries that's a multi-minute pin
on the channel's outbound allocator — precisely the leak the back-
pressure work was introduced to address.

The original code also gated observer access on a raw `_contentObserver
!= null` check-then-use outside the lock, defeating the lazy-init race
fix from 04eb985.

Fix: route observer creation through `ensureContentObserverInitialized()`
(same double-checked-lock helper that `send/cancel` use), then take
`_readyLock` around `processAndSend(...) + onCompleted()` with a
defensive `catch(Exception)` for the half-closed case. Shape mirrors
`cancel()`.

Surfaced by the review-concurrency-state pre-push skill on PR apache#18519.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The `setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters)` registered
in `beforeStart`, plus the `MailboxStatusObserver` callbacks
(`onNext`/`onError`/`onCompleted`), all fire from gRPC channel / Netty
event-loop threads. `wakeWaiters` acquires `_readyLock`, which briefly
blocks the event-loop if a sender thread is mid-`onNext` under the lock.

This is intentional — the lock-held critical section is bounded by a
single `onNext` / `onCompleted` and never spans a blocking wait. Adding
a one-paragraph comment so the next reader doesn't try to "fix" it by
deferring the signal to another executor, which would add an extra
thread hop on every wakeup of the hot back-pressure unblock path.

No code change.

Surfaced by the review-concurrency-state pre-push skill on PR apache#18519.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@gortiz
Copy link
Copy Markdown
Contributor Author

gortiz commented May 22, 2026

After the latest round of reviewer-requested fixes, I ran an internal strict pre-push review across the cumulative diff. It surfaced three additional issues that weren't on any review thread — fixed all three to avoid shipping them.

Fixed

  1. send(Eos) vs cancel race throwing IllegalStateException — symmetric to the data-path race that c39e12f1a4 closed, but on the closing edge of the stream. Both paths pass their top-of-method isTerminated() check before either sets _senderSideClosed. Cancel wins the _readyLock, calls onCompleted(), releases. The EOS path then takes the lock and calls onNext / onCompleted on an already-half-closed stream → IllegalStateException propagates out of send(Eos). Cancel-path swallowed it defensively; EOS-path didn't. Fixed in 840c5640ea with a defensive try/catch around the EOS path (matching the cancel-side pattern) plus a regression test concurrentSendEosAndCancelDoesNotThrow that drives send(EOS) + cancel() through a CyclicBarrier(2) for 200 iterations and asserts neither call throws.

  2. close() leaving the gRPC sender stream half-openclose() pushes an internal-error block via processAndSend(...bypassReady=true) but never calls onCompleted(). The stream sits half-open from the sender side until the per-call deadline fires — for multi-minute MSE queries that's exactly the allocator pin this PR exists to fix. Also had a _contentObserver != null check-then-act outside the lock (same shape ensureContentObserverInitialized exists to prevent). Fixed in aeaacc893d: route through ensureContentObserverInitialized, take _readyLock, set _senderSideClosed = true, call onCompleted() with the same defensive try/catch.

  3. wakeWaiters from Netty event-loop locks _readyLocksetOnReadyHandler fires on a Netty channel/event-loop thread; wakeWaiters then acquires _readyLock. If the sender thread is mid-onNext under that lock, the event-loop thread briefly blocks. Intentional and the lock-held sections are bounded by a single onNext/onCompleted, but the pattern is non-obvious enough to surprise a maintainer debugging latency spikes. Documented in e367aa697f so the next reader doesn't reach for a tryLock+executor restructure without context.

Considered and skipped

  • AtomicBoolean CAS on _senderSideClosed to make the close transition atomic across EOS / cancel / close(). Behavioral effect is now bounded by the defensive try/catches; left as a separate concern.
  • Documenting the precondition on ensureContentObserverInitialized ("callers must isTerminated()-check first") — pure docs.
  • Bench drainer's _stop.set(true) before _readSignal.release() ordering — benign at JVM shutdown.

Strict review run: code-reviewer agent with 8 domain-specialized sub-reviewers in parallel (concurrency-state, performance, correctness-nulls, testing, architecture, naming-api, process-scope, config-backcompat). The three above were everything in the CRITICAL/MAJOR/MINOR-worth-fixing buckets.

gortiz and others added 6 commits May 22, 2026 16:26
…asymmetry

Note in `MailboxService.registerMailboxClientGauges()` Javadoc that only the
two `USED` gauges are mirrored on the client side, and that the six debug-only
server gauges (`ARENAS_*`, `CACHE_SIZE_*`, `THREADLOCALCACHE`, `CHUNK_SIZE`)
are intentionally omitted -- the operational signal that matters on the client
is direct-memory pool exhaustion. Points future contributors at
`GrpcMailboxServer`'s constructor as the place to copy from if the remaining
six are ever wanted.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…h review)

Commit acdd1e3 ("Add kill-switch for manual inbound gRPC mailbox flow
control") added a 4-arg constructor to MailboxContentObserver taking a new
manualInboundFlowControlEnabled flag and kept the previous 3-arg constructor as
a delegating overload that defaults the flag to true. The 3-arg form is public
even though the only out-of-package caller is the unit test — production goes
through GrpcMailboxServer#open, which already passes the config-driven flag
explicitly.

The strict pre-push reviewer (review-naming-api / review-process-scope sub-
reviewers in the code-reviewer skill) flagged this as the same anti-pattern
Jackie-Jiang previously called out for GrpcSendingMailbox, collapsed in
44e2718 ("GrpcSendingMailbox: collapse constructor overload (review by
Jackie-Jiang)"): if the production default ever flips to false for a rollback,
test code that called the test-only overload silently stays on the old path
and the regression is invisible in CI.

Collapse the overload — drop the 3-arg constructor and update the lone test
caller (testMailboxContentObserverInitializationAndFetch) to pass true
explicitly. No behaviour change: the test always wanted the production default
(true), and the existing testOnNextReplenishesInboundCredit /
testOnNextDoesNotReplenishCreditWhenManualFlowControlDisabled tests already
exercised both flag values via the 4-arg ctor.
…push review)

Commit aeaacc8 ("Fix close() leaving the gRPC sender stream half-open") fixed
a real half-open-stream leak but regressed the close()-on-never-sent-mailbox
contract in the process. Pre-aeaacc893d, a GrpcSendingMailbox that was
constructed, never had send()/cancel() called on it, and then was closed was a
silent no-op on the wire — the `if (_contentObserver != null)` short-circuit
made sure of it. Post-aeaacc893d, the short-circuit was replaced with
ensureContentObserverInitialized(), which now opens a fresh gRPC stream just to
push an error EOS and half-close it — three wasted round-trips on a stream that
never needed to exist. For query stages pruned before any data flows that is
measurable wasted I/O, and the channel open itself becomes a new exception
surface during shutdown (channel evicted, executor shutdown, OOM storm).

The strict pre-push reviewer (review-process-scope / review-performance sub-
reviewers in the code-reviewer skill) flagged this on the follow-up review of
the half-open fix.

Fix: read _contentObserver once at the top of the cleanup section. If null,
return without touching the channel — matching the pre-aeaacc893d behaviour.
If non-null, proceed with the lock-protected error-EOS-then-onCompleted that
aeaacc8 added (that part is correct and load-bearing for the half-open fix).

cancel() intentionally stays as-is and DOES open the stream on a never-sent
mailbox — there may be a receiver-side reader blocked on this stream waiting
for the cancel signal, and the receiving mailbox is registered per the
dispatch plan regardless of whether the sender ever called send(). close()
has no such urgency: it is the "should not happen" cleanup fallback, not a
control-plane signal that needs to reach the receiver.

Regression test: closeOnNeverSentMailboxDoesNotOpenStream in
GrpcSendingMailboxTest. Constructs a CountingGrpcSendingMailbox (the same
counting subclass already used by concurrentSendAndCancelInitializeContentObserverExactlyOnce),
calls close() without prior send/cancel, and asserts the getContentObserver
counter stayed at 0 and the ChannelManager was never touched. Verified the
test fails on the unfixed code (expected [0] but found [1]) and passes after
the fix.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Commit 1d29438 ("Fail fast on invalid gRPC mailbox transport
configuration") introduced three `Preconditions.checkArgument` startup
gates plus an eager `new WriteBufferWaterMark(low, high)` invariant
check, but landed without negative-path tests for any of them. If a
future refactor accidentally deletes one, CI would not catch it.

This commit pins each gate with a TestNG
`@Test(expectedExceptions = IllegalArgumentException.class)`:

* `ChannelManagerTest.testConstructorRejectsZeroWriteBufferLowWaterMark`
  — pins the explicit `writeBufferLowWaterMarkBytes > 0` precondition.
* `ChannelManagerTest.testConstructorRejectsLowWatermarkAboveHighWatermark`
  — pins the eager construction of `WriteBufferWaterMark`, which
  surfaces Netty's own `low <= high` invariant at startup instead of on
  the first send.
* `GrpcMailboxServerValidationTest.testStartRejectsZeroInboundMessageCredit`
  — pins the `_inboundMessageCredit > 0` precondition.
* `GrpcMailboxServerValidationTest.testStartRejectsFlowControlWindowSmallerThanMaxMessageSize`
  — pins the `_flowControlWindowBytes >= maxInboundMessageSize`
  precondition.

The `GrpcMailboxServer` tests drive validation through `MailboxService`
construction + `start()` rather than instantiating `GrpcMailboxServer`
directly, which would require a heavy fixture (`MailboxService`,
`TlsConfig`, `SslContext`, `QueryAccessControlFactory`). The whole
point of the gates is fail-at-startup, so firing during
`MailboxService.start()` is equivalent in spirit.

Each test was verified to FAIL against unfixed-production-code (the
corresponding `Preconditions.checkArgument` / eager constructor call
temporarily removed) and to PASS against the production code at
e367aa6.

Surfaced by the review-process-scope pre-push skill on PR apache#18519.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…pt-in

Three production defaults flipped to ship the mechanism without changing
runtime behaviour for operators who have not hit the OOM this PR fixes:

  * `pinot.query.runner.grpc.sender.backpressure.enabled`: true → false.
    The `isReady()`-gated sender wait is now opt-in.
  * `pinot.query.runner.grpc.manual.inbound.flow.control.enabled`:
    true → false. The receiver returns to gRPC's auto-inbound-flow-
    control (one message in flight, post-`onNext`-return credit
    replenishment) — the pre-PR behaviour.
  * `pinot.query.runner.grpc.inbound.message.credit`: 128 → 1. Even
    when an operator flips the manual-flow-control flag on, the initial
    in-flight window stays at one message until the operator
    explicitly widens it.

Together, these defaults make this PR ship the mechanism + configs +
observability without altering on-the-wire behaviour for any cluster.
Operators who hit the OOM (slow consumer / large fan-out / skewed
shuffle) can flip on the two flags + raise the credit per the Javadoc
sizing formulas.

Tests that exercise the bounded-sender behaviour now explicitly opt in
(`GrpcSenderBackpressureTest`, `GrpcSenderBackpressureTightGateTest`).
`GrpcSenderBackpressureDisabledTest` already explicitly disabled the
sender gate and is unchanged. The JMH bench also explicitly opts in so
its A/B numbers across the `_flowControlWindowBytes` axis remain
interpretable as "the feature, with the production-default credit it
was designed with."

Javadoc on all three keys updated to reflect the opt-in framing.
After commit 9ff9bb4 flipped the production defaults to opt-in,
"Disabled" in the test name no longer matches reality — the test
exercises the *default* path through `awaitReady`, not a kill-switch
that opts out of an on-by-default feature. Rename + rewrite the
class-level Javadoc to describe what the test actually pins now: the
off-path of the gate, which is also the production default.

The other matrix tests describe themselves accurately and stay as-is:
  * `GrpcSenderBackpressureTest` — opt-in path, wide transport.
  * `GrpcSenderBackpressureTightGateTest` — opt-in path, narrow transport.
The cross-reference in the tight-gate test's Javadoc is updated.

No behaviour change; renames + comments only.
@gortiz gortiz force-pushed the mse-backpressure branch from 6a87e13 to f6d24f8 Compare May 25, 2026 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Improvement to existing functionality memory Related to memory usage or optimization metrics Related to metrics emission and collection multi-stage Related to the multi-stage query engine

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants