Skip to content

Add latency tracking for MSE adaptive routing#18791

Open
timothy-e wants to merge 13 commits into
apache:masterfrom
timothy-e:timothy-e-mse-ar-latency
Open

Add latency tracking for MSE adaptive routing#18791
timothy-e wants to merge 13 commits into
apache:masterfrom
timothy-e:timothy-e-mse-ar-latency

Conversation

@timothy-e

@timothy-e timothy-e commented Jun 17, 2026

Copy link
Copy Markdown
Contributor

#18553 added in-flight request tracking for MSE in the adaptive routing stats. This PR adds per-server latency. It looks very large, but much of it is tests, and some new files were introduced, which comes with a certain amount of boilerplate in Java.

1. The planner marks stages as needing timing. It assigns FragmentTypes and then the dispatcher uses those to turn those roles into timing collection in AdaptiveRoutingStageClassification.java. it enables collection only on trusted stages that either directly receive from pure leaves or are singleton-leaf stages themselves, while excluding stages fed by non-leaf senders so their timings do not include upstream cascade delay.

2. We increment server's inflight requests before submitting. In the inflight request PR, we did it after because we didn't want to extract the servers from submit before calling it. We thought submit would be near instant, so it wouldn't matter. In practice, we saw a case (reproduced by issuing a SIGSTOP) where the submit times out and MSE adaptive routing couldn't track it.

3. Stages track their elapsed time and return it as UPSTREAM_SERVER_RESPONSE_TIMES_MS stat (BaseMailboxReceiveOperator, BlockingMultiStreamConsumer, AdaptiveRoutingUpstreamTimings)

Each BaseMailboxReceiveOperator tracks per-sender wall-clock elapsed time (from when its BlockingMultiStreamConsumer.OfMseBlock is constructed until each sender's EOS arrives). The start time is captured at consumer construction time, not at query start, to avoid pipeline-breaker inflation: if a pipeline breaker on this stage blocks for 1000ms waiting on a slow sender, fast senders' EOS blocks sit unconsumed in their queues during that time, and measuring from query start would report ~1000ms for all of them.

These per-sender timings are encoded as "hostname|port=elapsedMs;..." in a new UPSTREAM_SERVER_RESPONSE_TIMES_MS stat key on BaseMailboxReceiveOperator.StatKey and propagated up through the query stats. A serialized string is used because it is more straightforward than adding a Map type to the StatKey system.

4. Broker reads leaf-server timings and resolves hostname|mailboxPort sender keys to full instance IDs and accumulates the maximum observed latency per instance across all trusted stages. A decrementedServers set prevents double-recording.

5. Query timeout: on query cancellation (e.g. reaching the timeout), we try to cancel with stats. We preserve completed timings and inject elapsed time for pending senders.

Testing

1. Performance impact: We ran some performance tests and saw no impact.

2. Manually with tc

Choose a server that we want to induce latency on. ssh to that server and:

sudo tc qdisc add dev ens5 root handle 1: prio
sudo tc qdisc add dev ens5 parent 1:3 handle 30: netem delay 1200ms

sudo tc filter add dev ens5 protocol ip parent 1:0 prio 3 u32 \
  match ip dport 8442 0xffff flowid 1:3

We induced latency only for MSE to validate that adaptive routing can be influenced by only MSE queries.

To reset the latency

sudo tc qdisc del dev ens5 root # cleanup

We saw the score spike for the targetted servers.
3. Chaos Agent
We kicked off a chaos agent run that does

  1. 90s of 600ms MSE latency
  2. 30s with no faults
  3. 90s of 600ms SSE latency
  4. 90s with no faults
  5. 90s of 1200ms MSE latency
  6. 30s with no faults
  7. 90s of 1200ms SSE latency

(query timeout is 1s).

image

We did a similar experiment with SIGSTOP issued to a single server's Pinot process.

Screenshot 2026-06-18 at 9 32 49 am

@codecov-commenter

codecov-commenter commented Jun 17, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 83.23864% with 59 lines in your changes missing coverage. Please review.
✅ Project coverage is 64.81%. Comparing base (25e378b) to head (c7682df).
⚠️ Report is 42 commits behind head on master.

Files with missing lines Patch % Lines
.../pinot/query/service/dispatch/QueryDispatcher.java 78.66% 22 Missing and 10 partials ⚠️
...e/dispatch/AdaptiveRoutingStageClassification.java 73.46% 7 Missing and 6 partials ⚠️
...rvice/dispatch/AdaptiveRoutingUpstreamTimings.java 83.78% 3 Missing and 3 partials ⚠️
...ava/org/apache/pinot/common/datatable/StatMap.java 20.00% 0 Missing and 4 partials ⚠️
...y/runtime/operator/BaseMailboxReceiveOperator.java 91.89% 1 Missing and 2 partials ⚠️
...uery/planner/physical/DispatchablePlanContext.java 83.33% 0 Missing and 1 partial ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18791      +/-   ##
============================================
+ Coverage     64.79%   64.81%   +0.01%     
  Complexity     1322     1322              
============================================
  Files          3393     3396       +3     
  Lines        211235   211550     +315     
  Branches      33206    33274      +68     
============================================
+ Hits         136878   137122     +244     
- Misses        63303    63350      +47     
- Partials      11054    11078      +24     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-21 64.81% <83.23%> (+0.01%) ⬆️
temurin 64.81% <83.23%> (+0.01%) ⬆️
unittests 64.81% <83.23%> (+0.01%) ⬆️
unittests1 57.06% <82.95%> (+0.04%) ⬆️
unittests2 37.10% <9.37%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@timothy-e timothy-e marked this pull request as ready for review June 18, 2026 13:40
@timothy-e

Copy link
Copy Markdown
Contributor Author

Hey @yashmayya, I re-opened this one to replace #18646 because I force-pushed while that one was closed.

It's 99% the same code, but with two bug fixes based on what we've seen with it deployed to prod - 402e089 and 5434659.

AdaptiveRoutingStageClassification classification = null;
Consumer<QueryServerInstance> preDispatchHook = null;
if (statsManager != null) {
classification = AdaptiveRoutingStageClassification.classify(dispatchableSubPlan);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This adds the trusted-stage/timing setup for the unary path, but submitAndReduceWithStream() still never does the equivalent classification / COLLECT_UPSTREAM_TIMING_KEY wiring and still decrements every server with -1. So a query that enables STREAM_STATS silently loses the new MSE latency signal entirely, including submit-timeout cases. Can we plumb the same flow into stream mode, or explicitly gate adaptive-routing latency off there?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Integrating this with the stream mode is best left for a follow up PR, because it will expand the scope and require a lot more testing than what I've already done.

Explicitly turning it off would require reverting some of the changes added in the streaming PR, which also feels weird to do in this PR.

Since the current state is "streaming stats doesn't track latency for MSE adaptive routing," I think it makes sense to leave it as is and create a github issue to support it.


// Stage 0 (the broker reducer) is always trusted: its sender timings are measured directly
// by the multi-consumer, not propagated through intermediate stages.
trustedStageIds.add(0);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unconditionally trusting stage 0 looks unsafe. When the broker receives from an intermediate stage, this mailbox timing already includes upstream cascade delay, and extractMaxTimingsPerInstance() later takes Math::max across all trusted stages. If the same server also ran a leaf stage, the stage-0 value can overwrite that server's real leaf latency and bias the adaptive-routing EMA upward. Can we only trust stage 0 when its direct senders are leaf stages, or otherwise ignore stage-0 timings once a trusted leaf stage already attributed that instance?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch. In my original iterations, I found this to be helpful. But in the iterations since then, it seems it was no longer necessary. I removed these lines and retested and found that we still clearly identify the problematic server.
Screenshot 2026-06-23 at 11 13 44 am

@timothy-e timothy-e force-pushed the timothy-e-mse-ar-latency branch from 54cdfd6 to 7c19e14 Compare June 23, 2026 15:10

@xiangfu0 xiangfu0 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one high-signal correctness issue; see inline comment.

if (!hasScannedTable) {
return INTERMEDIATE;
}
List<Integer> singletonSenderStageIds = getSingletonReceiveSenderStageIds(root);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This classification only looks at SINGLETON inputs. A stage that scans a table and also has a HASH/BROADCAST mailbox receive from a non-leaf stage still returns LEAF here, and a stage with both a leaf SINGLETON input and any non-leaf input still returns SINGLETON_LEAF. AdaptiveRoutingStageClassification then trusts/tracks those stages as if they were pure leaves, so the elapsed times from their receive operators include upstream cascade delay and can poison the adaptive-routing EMA for healthy servers. Please downgrade any fragment that has any mailbox receive from a non-leaf sender to INTERMEDIATE (or otherwise exclude it from trusted/tracked), and add a regression test for the mixed-input topology.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an awesome suggestion, thanks Xiang! I implemented it, retested, and the results are much better!

Repeating the testing from 2 days ago, we see 25% lower error counts during induced server latency with this latest change.

image

timothy-e and others added 13 commits June 25, 2026 11:37
cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/597 added in-flight request tracking for MSE in the adaptive routing stats. This PR adds per-server latency. It looks very large, but ~950 lines are tests, and two new files were introduced, which comes with a certain amount of boilerplate in Java.

**1. Per-sender elapsed time in `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat** (`BaseMailboxReceiveOperator`, `BlockingMultiStreamConsumer`, `AdaptiveRoutingUpstreamTimings`)

Each `BaseMailboxReceiveOperator` now tracks per-sender wall-clock elapsed time (from when its `BlockingMultiStreamConsumer.OfMseBlock` is constructed until each sender's EOS arrives). The start time is captured at consumer construction time, not at query start, to avoid pipeline-breaker inflation: if a pipeline breaker on this stage blocks for 1000ms waiting on a slow sender, fast senders' EOS blocks sit unconsumed in their queues during that time, and measuring from query start would report ~1000ms for all of them.

These per-sender timings are encoded as `"hostname|port=elapsedMs;..."` in a new `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat key on `BaseMailboxReceiveOperator.StatKey` and propagated up through the query stats. A serialized string is used because it is more straightforward than adding a Map type to the `StatKey` system.

**2. Broker reads leaf-server timings via `StageClassification` + `extractMaxTimingsPerInstance`** (`QueryDispatcher`, `StageClassification`)

After `runReducer` completes, the broker classifies the query plan using `StageClassification.classify()`. Two kinds of stages are "trusted" (their `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stats are consulted):
 - **Pure leaf receivers**: stages that directly receive from a non-SINGLETON leaf stage.
 - **SINGLETON leaf stages receiving from another leaf**: a leaf stage that scans a dimension table and receives upstream data via a SINGLETON exchange, but only when the sender stage is also a leaf.

Stages that receive from any non-leaf (intermediate) stage are excluded because the non-leaf sender may have waited on a slow upstream cascade, inflating per-sender timings at the receiver.

Only pure leaf servers (non-SINGLETON) are added to `_trackedServers` and are eligible for EMA updates. SINGLETON leaf and intermediate servers are excluded because their timings reflect cascade delays rather than their own scan performance.

For each trusted stage, the broker resolves `hostname|mailboxPort` sender keys to full instance IDs and accumulates the maximum observed latency per instance across all trusted stages. A `decrementedServers` set prevents double-recording.

**3. Fallback latency for servers not covered by extracted timings** (`QueryDispatcher` finally block)
Two fallback paths handle servers not recorded via `extractMaxTimingsPerInstance`:
- **Fallback 1 (no partial timings at all)**: If `decrementedServers` is empty (e.g. the query timed out before any stats returned), all incremented servers are recorded with `-1L` (no timing available). This avoids marking all servers with a misleading full-elapsed-time value when we have no real signal.
 - **Fallback 2 (partial timings received)**: For servers not yet decremented:
   - If the server is in `trackedServers` (a pure leaf server): records the full elapsed time (`System.currentTimeMillis() - submitTimeMs`), since the server likely timed out or was unresponsive.
   - Otherwise (intermediate/SINGLETON server): records `-1L` (not tracked for EMA).

[STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418)

I ran a PerformanceTestWorkflow
```
{
    "numUsers": 5,
    "clusterInfo": {"clusterName": "rad-canary", "clusterRegion":
  "northwest"},
    "tenant": "long-lived-a",
    "durationMinutes": 90,
    "loadTestConfig": {
      "perQueryExpectations": {
        "sum_payments_group_by_merchant_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 244, "maxP50DurationMs": 165,
  "minSuccessRate": 0.999},
        "count_sum_payments_performance_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 229, "maxP50DurationMs": 150,
  "minSuccessRate": 0.999},
        "select_payments_three_table_join_performance_since_1h_v2":
   {"minQps": 2.0, "maxP99DurationMs": 376, "maxP50DurationMs":
  255, "minSuccessRate": 0.999},
        "select_payments_cte_join_window_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 432, "maxP50DurationMs": 322,
   "minSuccessRate": 0.999},
        "select_payments_union_join_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 339, "maxP50DurationMs": 184,
   "minSuccessRate": 0.999},
        "select_payments_nested_subquery_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 196, "maxP50DurationMs": 101,
   "minSuccessRate": 0.999}
      },
      "aggregateExpectations": {"minQps": 16.0, "maxP99DurationMs":
   432, "maxP50DurationMs": 322, "minSuccessRate": 0.999}
    }
  }
```

These queries are all complex MSE queries that mirror some of the patterns in the billing analytics MRR queries.

I also added [another server to reach replica in canary, and another replica](https://git.corp.stripe.com/stripe-internal/mint/pull/2124801), to further validate the distribution.

And induced latency in one of two ways:

**1. Manually with tc**

Choose a server that we want to induce latency on. `ssh` to that server and:

```
sudo tc qdisc add dev ens5 root handle 1: prio
sudo tc qdisc add dev ens5 parent 1:3 handle 30: netem delay 1200ms

sudo tc filter add dev ens5 protocol ip parent 1:0 prio 3 u32 \
  match ip dport 8442 0xffff flowid 1:3

```

We induced latency only for MSE to validate that adaptive routing can be influenced by only MSE queries.

To reset the latency

```
sudo tc qdisc del dev ens5 root # cleanup
```

We left this on for [a while](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=2m&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777497188195&to=1777502138949), and we can see there's a small amount of contamination to another server, but generally the server with the induced latency is identified as a clear outlier.

**With Chaos Scenarios**

Or run a [chaos-scenario](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=30s&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777557375599&to=1777558632565) that runs a 5 minute 600ms latency increase, then a 1m 1200ms increase, then a 1m blackhole.

And with adaptive routing off, [we see](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=30s&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777557375599&to=1777558632565)
* 2960 total timeouts
* 2140 timeouts from the 5 minute 600ms spike.
* Overall QPS dropped from 55 to 14 during the 600ms / 1200ms / blackhole.
* p50 increased from 70ms to 360ms
* p75 increased from 90ms to 810ms
* p99 increased from 130ms to 890ms

And with adaptive routing on, [we see](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=2m&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=pinotdbstreaming&from=1777560397381&to=1777561273611)
* 931 total timeouts (3x reduction vs adaptive routing off)
* 662 timeouts from the 5 minute 600ms spike (3x reduction vs adaptive routing off)
* Overall QPS dropped from 53 to 43 during the 600ms spike, and 33 during the 1200ms / blackhole.
* p50 increased from 70ms to 120ms (3x reduction vs adaptive routing off)
* p75 increased from 90ms to 130ms (6x reduction vs adaptive routing off)
* p99 increased from 130ms to 820ms

Degraded servers are detected better than servers that never respond at all (or respond after the query timeout). But they do still appear in the latency stats.

**Rad Rose**

Got claude to generate a [load test script](https://git.corp.stripe.com/gist/timothye/a2346d0af61476c4032fc623cc5a7298) based on the [billing analytics queries](https://git.corp.stripe.com/gist/dang/bb38f694ea62b0f92c7a8fec944f6612).  It
* queries the _rose instead of _testing tables
* replaces the merchant in the queries with a merchant with actual data in QA
* ensured all queries return some results by changing time filters as necessary.

After a few minutes of the load test, I added 600ms of latency to qa-pinotdbserver--01e5bebf93cd9888d.northwest.stripe.io.

[There seems](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-rose&var-pinot_tenant=billinganalyticsrose&var-host_type=All&from=1777579989000&to=1777580954563) to be contamination into 1 or 2 realtime servers, and occasionally one offline server, but generally, the offline server is distinctly regarded as higher. The contamination effects would likely be lower if adaptive routing was enabled (not just the stats), because there would be less timeouts in general.
<img width="1384" alt="Screenshot 2026-04-30 at 4 29 11 pm" src="https://git.corp.stripe.com/user-attachments/assets/e217e755-92e6-4389-8876-15e7ad0736e9" />

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-01T12:24:16Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/607
…che#633)

cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

Some performance improvements: removing redundant or thrown away work.

1. Remove TreeMap sort from encode() — The encoded string doesn't need deterministic key ordering (the broker decodes into a HashMap anyway), so removed the unnecessary sort allocation.
2. Replace String.split() with indexOf-based parsing in decode() — split() allocates a regex Pattern and a String[] on every call. Manual indexOf loop parses in-place with zero intermediate allocations.
3. Short-circuit mergeEncodings() when either arg is null/empty — Avoids decoding + re-encoding when one side has no data (common case: first worker EOS has nothing to merge with).
4. Move stage classification to planning time, which simplifies the classification a bit.
5. Since we now classify before dispatching, we can inject `collectUpstreamTiming=true` into trusted stages' `customProperties`. Each untrusted stage does not need to init a hashmap and collect timings.

[STREAMANALYTICS-4484](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4484)

Started a PerformanceTestWorkflow with all the latest MSE queries on rad-canary, and ran a [chaos-scenario](https://amp.qa.corp.stripe.com/chaos-scenarios/scenario-b3c7091d-8e3c-41e8-b7ca-8330b9270a51) on it. The affected servers are clearly identified.
<img width="1249" alt="Screenshot 2026-05-12 at 11 42 00 am" src="https://git.corp.stripe.com/user-attachments/assets/47a6c84b-5891-44c8-9d6f-58aba36846ab" />
[[Grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778599320072&to=1778600291193&viewPanel=16)]

On master, the canary performance tests (the original 4) had results:
```
    "Aggregate QPS 35.62 >= 12.61",
    "Aggregate success rate 100.00% >= 99.90%",
    "Aggregate P99 duration 82 ms <= 240 ms",
    "Aggregate P50 duration 54 ms <= 171 ms"
```
and
```
    "Aggregate QPS 33.50 >= 12.61",
    "Aggregate success rate 100.00% >= 99.90%",
    "Aggregate P99 duration 85 ms <= 240 ms",
    "Aggregate P50 duration 58 ms <= 171 ms"
```

after deploying this change, we saw:
```
    "Aggregate QPS 38.53 >= 12.61",
    "Aggregate success rate 100.00% >= 99.90%",
    "Aggregate P99 duration 75 ms <= 240 ms",
    "Aggregate P50 duration 51 ms <= 171 ms"
```

QPS is all over the place, but there's no obvious regression.

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-12T16:01:57Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/633
[Minion run](https://devboxproxy.qa.corp.stripe.com/timothye/agent/agent_run_UVLJfTr66uix4g)

cc stripe-private-oss-forks/pinot-reviewers
r? dang

Fixes the failing unit test `MailboxReceiveOperatorTest.copyStatMapsIncludesPartialTimingWhenSlowSenderNeverCompletes` introduced alongside the adaptive routing latency tracking feature (apache#633).

The test was missing the `collectUpstreamTiming=true` op-chain metadata flag that `BaseMailboxReceiveOperator` requires to enable per-sender timing collection. Without this flag, `_streamIdToSenderKey` is empty, `_senderElapsedMs` is never populated, and `copyStatMaps()` returns null for `UPSTREAM_SERVER_RESPONSE_TIMES_MS` — causing the `assertNotNull` at line 305 to fail.

Changes:
- **`OperatorTestUtil`**: Added an overloaded `getOpChainContext()` that accepts custom `opChainMetadata`, so tests can supply operator configuration flags.
- **`MailboxReceiveOperatorTest`**: Added a `getOperator()` overload accepting metadata, and updated the failing test to pass `COLLECT_UPSTREAM_TIMING_KEY -> "true"`.

The `copyStatMapsIncludesPartialTimingWhenSlowSenderNeverCompletes` test was failing in CI with:
```
java.lang.AssertionError: copyStatMaps() should include partial timing for fast sender that already sent EOS expected object to not be null
```

This was a test-only bug — the production `copyStatMaps()` implementation is correct, but the test was not setting up the operator context to enable the timing collection feature it was asserting on.

Ran the full `MailboxReceiveOperatorTest` suite (11 tests) — all pass:
```
Tests run: 11, Failures: 0, Errors: 0, Skipped: 0
BUILD SUCCESS
```

https://cibot.corp.stripe.com/builds/bui_UVLY8q2t30qyaW shows a success

This is a test-only change with no effect on production services.

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-12T20:49:23Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/644
)

cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

When a query timed out, MSE adaptive routing (AR) stats did not properly identify the failing server, because it tried not to mark all servers as degraded. This was less obvious when the AR stats were not separated between MSE and SSE. However, since we have seen degradations that only affect one query engine, and we know that we want to successfully handle query timeouts, this needed to be fixed.

This PR:
- On the timeout path, collects stats about which servers responded from cancelWithStats, and passes those to extractMaxTimingsPerInstance.
- Enable timing collection on the broker's stage-0 mailbox receive operator so it captures direct-sender attribution even when EOS never arrives
- Add getSenderElapsedMsIncludingPending() to BlockingMultiStreamConsumer which reports wall-clock elapsed time for senders that haven't completed yet (used on the cancel/timeout path)

| Scenario                                             | Before PR                                                                                                              | After PR                                                                                                                                                             |
| ---------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| Normal completion (no timeout)                       | `onEos()` merges sender timings; `extractMaxTimingsPerInstance` reads trusted stages; leaf servers get actual latency  | Unchanged                                                                                                                                                            |
| Worker sends error, broker cancels                   | `cancel()` (fire-and-forget, no stats); all servers get -1L                                                            | Unchanged                 |
| Broker times out (`TimeoutException`)                | `tryRecover` → `cancelWithStats`, but cancel stats not passed to `extractMaxTimingsPerInstance`; no responder tracking | `tryRecover` → `cancelWithStats`; cancel stats merged into `extractMaxTimingsPerInstance` for trusted worker stages; non-responders marked degraded with `elapsedMs` |
| Cancel itself times out (some servers don't respond) | No distinction between responders and non-responders; all untracked servers get -1L                                    | `respondingServerIds` tracks who responded; non-responders marked degraded with `elapsedMs` (tier 3)                                                                 |
| Throwable (OOM, etc.)                                | `cancel()` (fire-and-forget); all servers get -1L                                                                      | Unchanged                                                                                                                                                            |

[STREAMANALYTICS-4418](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4418)

We ran a PerformanceTestWorkflow with the following arguments (on an isolated env where https://git.corp.stripe.com/stripe-internal/mint/pull/2110397 is deployed, since its been reverted on master)
```
{
    "numUsers": 5,
    "clusterInfo": {"clusterName": "rad-canary", "clusterRegion":
  "northwest"},
    "tenant": "long-lived-a",
    "durationMinutes": 90,
    "loadTestConfig": {
      "perQueryExpectations": {
        "sum_payments_group_by_merchant_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 244, "maxP50DurationMs": 165,
  "minSuccessRate": 0.999},
        "count_sum_payments_performance_since_1h_v2": {"minQps":
  2.0, "maxP99DurationMs": 229, "maxP50DurationMs": 150,
  "minSuccessRate": 0.999},
        "select_payments_three_table_join_performance_since_1h_v2":
   {"minQps": 2.0, "maxP99DurationMs": 376, "maxP50DurationMs":
  255, "minSuccessRate": 0.999},
        "select_payments_cte_join_window_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 432, "maxP50DurationMs": 322,
   "minSuccessRate": 0.999},
        "select_payments_union_join_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 339, "maxP50DurationMs": 184,
   "minSuccessRate": 0.999},
        "select_payments_nested_subquery_performance_since_1h_v2":
  {"minQps": 2.0, "maxP99DurationMs": 196, "maxP50DurationMs": 101,
   "minSuccessRate": 0.999}
      },
      "aggregateExpectations": {"minQps": 16.0, "maxP99DurationMs":
   432, "maxP50DurationMs": 322, "minSuccessRate": 0.999}
    }
  }
```

We kicked off a [chaos agent](https://amp.qa.corp.stripe.com/chaos-scenarios/scenario-f874bad3-419e-4a7a-870b-06010ba9fd63) run that does
1. 90s of 600ms **MSE latency**
2. 30s with no faults
3. 90s of 600ms **SSE latency**
4. 90s with no faults
5. 90s of 1200ms **MSE latency**
6. 30s with no faults
7. 90s of 1200ms **SSE latency**

Before this change was deployed, we saw the 600ms spikes appear in MSE and SSE, but the 1200ms spikes only appeared in the SSE graph.
<img width="739" alt="Screenshot 2026-05-14 at 12 49 13 pm" src="https://git.corp.stripe.com/user-attachments/assets/f168450a-4242-4e58-899f-0dfc741fd52e" />
[[grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778699889324&to=1778700572859)]

After this change was deployed, we saw each spike appear in each engine.
<img width="731" alt="Screenshot 2026-05-14 at 1 27 27 pm" src="https://git.corp.stripe.com/user-attachments/assets/83e672bf-6ba1-4ed1-8121-88a85be78d7b" />

[[grafana](https://g-8916660cfe.grafana-workspace.us-west-2.amazonaws.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto_interval_min_interval&var-host_cluster=northwest&var-pinot_cluster=rad-canary&var-pinot_tenant=long-lived-a&var-host_type=All&from=1778779007090&to=1778779619206)]

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-05-15T17:00:38Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/648

# Conflicts:
#	pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
…e SIGSTOP-style timeouts (apache#673)

cc stripe-private-oss-forks/pinot-reviewers
r? dang saiswapnilar

`incrementedServers` was populated only after `submit()` returned, so when `submit()` hung on a frozen-but-connected server and threw `TimeoutException`, the set was empty and `recordPerServerLatencies` was a no-op — the adaptive routing EMA was never updated.

SSE already increments before dispatch (`AsyncQueryResponse.java:62`), but MSE didn't because we didn't want to pull out the details of `submit` to get all the servers, but now we see that it's necessary. Instead of leaking the implementation details, pass a pre-dispatch hook through `submit` that calls `recordStatsForQuerySubmission` before entering the `try` block.

When `submit()` now times out, `incrementedServers` is populated -> `tryRecover` calls `cancelWithStats` -> unresponsive servers absent from `respondingServerIds` -> Tier 3 in `recordPerServerLatencies` -> `latency = elapsedMs` -> EMA updated.

[STREAMANALYTICS-4543](https://jira.corp.stripe.com/browse/STREAMANALYTICS-4543)

In the below graph, `before-submit` refers to this PR.
![image](https://git.corp.stripe.com/user-attachments/assets/a71bf3ff-1c9b-401b-a003-42aed6a39c77)

Using a script that
* runs a 4 minute load test
* after 1 minute of the load test, sends `svc -p` to the Pinot process on one realtime and one offline server
* after 2 minutes, sends `svc -c` to continue.
* between runs, lowers adaptive routing params so that the stats are reset.

Before this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:32:54.129Z&to=2026-06-16T13:50:48.107Z&timezone=America%2FToronto&var-overrides=&refresh=10s) not very conclusive results for MSE latency stats:
<img width="1361" alt="Screenshot 2026-06-16 at 10 36 50 am" src="https://git.corp.stripe.com/user-attachments/assets/40f447b0-fbaa-42a0-bf12-3b568af8065f" />

With this change, [grafana showed](https://grafana.corp.stripe.com/d/8JZ_bymSz/pinot-adaptive-routing-rollout?orgId=1&var-datasource=zb219lV4k&var-min_interval=$__auto&var-host_cluster=northwest&var-pinot_cluster=$__all&var-pinot_tenant=billinganalyticsrose&var-host_type=$__all&from=2026-06-16T13:59:29.589Z&to=2026-06-16T14:14:23.905Z&timezone=America%2FToronto&var-overrides=&refresh=10s) very obvious results in the MSE latency stats.
<img width="1365" alt="Screenshot 2026-06-16 at 10 37 54 am" src="https://git.corp.stripe.com/user-attachments/assets/a1721aca-f7d7-42d3-b117-fd1c7bc5e2e0" />

```
set -euo pipefail

ALPHAS=(0.666)
AUTODECAYS=(60)
EXPONENTS=(2 2 2)
RESET_ALPHA=1
RESET_AUTODECAY_MS=1000
RESET_EXPONENT=2
RESET_SLEEP_SECONDS=30
CONTROLLER_HOST=qa-pinotdbcontroller--0de7e24d978a877a9.northwest.stripe.io
STREAMING_HOST=qa-pinotdbstreaming--0606dd9b61a58afdb.northwest.stripe.io
SERVER_HOST=qa-pinotdbserver--022dc12bf5cb176d2.northwest.stripe.io

RESULTS_DIR="results/$(date +%Y-%m-%d)"
mkdir -p "$RESULTS_DIR"

TOTAL=$((${#ALPHAS[@]} * ${#AUTODECAYS[@]} * ${#EXPONENTS[@]}))
RUN=0

set_routing_params() {
  local alpha="$1"
  local autodecay_ms="$2"
  local exponent="$3"
  local payload

  payload=$(printf '{"pinot.broker.adaptive.server.selector.autodecay.window.ms": "%s", "pinot.broker.adaptive.server.selector.ewma.alpha": "%s", "pinot.broker.adaptive.server.selector.hybrid.score.exponent": "%s"}' \
    "$autodecay_ms" "$alpha" "$exponent")

  for attempt in 1 2 3 4 5; do
    if ssh "$CONTROLLER_HOST" \
      "curl -X POST localhost:9000/cluster/configs -H 'Content-Type: application/json' -d '$payload'"; then
      return 0
    fi

    if [ "$attempt" -eq 5 ]; then
      echo "ERROR: Failed to set params after 5 attempts"
      return 1
    fi

    echo "Attempt ${attempt} failed, retrying in 5s..."
    sleep 5
  done
}

pause_servers() {
  echo "Pausing servers..."
  until ssh "$STREAMING_HOST" 'sudo svc -p /etc/service/pinot-streaming' && \
        ssh "$SERVER_HOST" 'sudo svc -p /etc/service/pinot-server'; do
    echo "Pause failed, retrying in 2s..."
    sleep 2
  done
}

unpause_servers() {
  echo "Unpausing servers..."
  until ssh "$STREAMING_HOST" 'sudo svc -c /etc/service/pinot-streaming' && \
        ssh "$SERVER_HOST" 'sudo svc -c /etc/service/pinot-server'; do
    echo "Unpause failed, retrying in 2s..."
    sleep 2
  done
}

run_test() {
  local alpha="$1"
  local autodecay_ms="$2"
  local exponent="$3"
  local timestamp results_file

  timestamp=$(date +%H%M%S)
  results_file="${RESULTS_DIR}/alpha=${alpha}_autodecay=${autodecay_ms}_exponent=${exponent}_${timestamp}.txt"

  echo "Starting traffic (results will be saved to ${results_file})..."
  pay remote ssh run-ba-testing-traffic -- python3 pinot-rose-load-test.py --duration 240 > "$results_file" 2>&1 &
  local traffic_pid=$!

  echo "Setting autodecay=${autodecay_ms}ms, alpha=${alpha}, exponent=${exponent}..."
  set_routing_params "$alpha" "$autodecay_ms" "$exponent"

  echo "Waiting 60s..."
  sleep 60

  pause_servers

  echo "Waiting 120s before unpausing..."
  sleep 120

  unpause_servers

  echo "Waiting for traffic generation to complete..."
  wait $traffic_pid
  echo "Done. Results saved to ${results_file}"
}

for autodecay in "${AUTODECAYS[@]}"; do
  for alpha in "${ALPHAS[@]}"; do
    for exponent in "${EXPONENTS[@]}"; do
      RUN=$((RUN + 1))
      echo "=== Run ${RUN}/${TOTAL}: alpha=${alpha}, autodecay=${autodecay}s, exponent=${exponent} ==="
      run_test "$alpha" "$((autodecay * 1000))" "$exponent"
      echo ""

      if [ "$RUN" -lt "$TOTAL" ]; then
        echo "Resetting adaptive routing to alpha=${RESET_ALPHA}, autodecay=${RESET_AUTODECAY_MS}ms, exponent=${RESET_EXPONENT} between runs..."
        set_routing_params "$RESET_ALPHA" "$RESET_AUTODECAY_MS" "$RESET_EXPONENT"
        echo "Sleeping ${RESET_SLEEP_SECONDS}s between runs..."
        sleep "$RESET_SLEEP_SECONDS"
      fi
    done
  done
done

echo "=== Sweep complete: ${TOTAL} runs ==="
```

Stripe-Original-Repo: stripe-private-oss-forks/pinot
Stripe-Monotonic-Timestamp: v2/2026-06-17T03:35:29Z/0
Stripe-Original-PR: https://git.corp.stripe.com/stripe-private-oss-forks/pinot/pull/673
@timothy-e timothy-e force-pushed the timothy-e-mse-ar-latency branch from 7c19e14 to c7682df Compare June 25, 2026 15:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants