Skip to content

feat(broker): extract doScatter/doReduce hooks in single-stage broker request handlers#18316

Merged
yashmayya merged 6 commits intoapache:masterfrom
navina:feat/broker-result-cache-wiring
Apr 25, 2026
Merged

feat(broker): extract doScatter/doReduce hooks in single-stage broker request handlers#18316
yashmayya merged 6 commits intoapache:masterfrom
navina:feat/broker-result-cache-wiring

Conversation

@navina
Copy link
Copy Markdown
Contributor

@navina navina commented Apr 24, 2026

Summary

  • Extracts protected doScatter and doReduce methods in SingleConnectionBrokerRequestHandler and GrpcBrokerRequestHandler, replacing the previous hook-method design (mergeWithCachedDataTables, mergeWithCachedStreamingResponses)
  • The default processBrokerRequest delegates to doScatter then doReduce — identical behavior to before
  • Subclasses can now own the full pipeline: preScatter → super.doScatter → merge → super.doReduce without needing OSS hook methods or ThreadLocals
  • Fixes a correctness bug: ScatterResult.getNumServersQueried() / getNumServersResponded() were computed lazily from _dataTableMap.size(), but BrokerReduceService.reduceOnDataTable drains the map via iterator.remove() during reduce. Counts are now snapshotted eagerly at ScatterResult construction time.

Changes

SingleConnectionBrokerRequestHandler

  • Removed mergeWithCachedDataTables hook; processBrokerRequest now calls doScatter + doReduce
  • New protected doReduce(originalBrokerRequest, serverBrokerRequest, dataTableMap, scatterResult, scatterGatherStartTimeNs, timeoutMs, rawTableName) — takes dataTableMap and scatterResult separately so subclasses can pass a merged map while stats always reflect real servers queried
  • ScatterResult: added _numServersQueried / _numServersResponded fields snapshotted at construction

GrpcBrokerRequestHandler

  • Removed mergeWithCachedStreamingResponses hook; processBrokerRequest now calls doScatter + doReduce
  • New protected doReduce(originalBrokerRequest, responseMap, timeoutMs)

…gle-stage engine

Add protected extension points to `SingleConnectionBrokerRequestHandler` and
`GrpcBrokerRequestHandler` that allow subclasses to inject cached DataTables
before the reduce step, without duplicating the scatter-gather path.

Changes:
- `SingleConnectionBrokerRequestHandler`: extract `doScatter()` + `ScatterResult`;
  add `mergeWithCachedDataTables()` no-op hook called between scatter and reduce;
  expose `_brokerReduceService`, `_queryRouter`, `_failureDetector` as `protected final`
- `GrpcBrokerRequestHandler`: add `mergeWithCachedStreamingResponses()` no-op hook;
  `dataTableToStreamingIterator()` now `throws IOException` instead of swallowing it;
  expose `_streamingReduceService`, `_streamingQueryClient`, `_failureDetector` as
  `protected final`; add `createGrpcBrokerRequestHandler()` factory hook on
  `BaseBrokerStarter`
- `GrpcBrokerRequestHandlerTest`: round-trip and IOException-propagation unit tests

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@navina navina force-pushed the feat/broker-result-cache-wiring branch from aa86026 to 2245d58 Compare April 24, 2026 07:16
@xiangfu0 xiangfu0 added query Related to query processing feature New functionality labels Apr 24, 2026
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 24, 2026

Codecov Report

❌ Patch coverage is 0% with 57 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.67%. Comparing base (6b79b57) to head (1da0e4c).
⚠️ Report is 15 commits behind head on master.

Files with missing lines Patch % Lines
...thandler/SingleConnectionBrokerRequestHandler.java 0.00% 51 Missing ⚠️
...roker/requesthandler/GrpcBrokerRequestHandler.java 0.00% 4 Missing ⚠️
...e/pinot/broker/broker/helix/BaseBrokerStarter.java 0.00% 2 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18316      +/-   ##
============================================
+ Coverage     63.60%   63.67%   +0.07%     
  Complexity     1659     1659              
============================================
  Files          3246     3246              
  Lines        197510   197549      +39     
  Branches      30578    30577       -1     
============================================
+ Hits         125620   125798     +178     
+ Misses        61845    61706     -139     
  Partials      10045    10045              
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.60% <0.00%> (+0.02%) ⬆️
java-21 63.66% <0.00%> (+0.08%) ⬆️
temurin 63.67% <0.00%> (+0.07%) ⬆️
unittests 63.67% <0.00%> (+0.07%) ⬆️
unittests1 55.62% <ø> (+0.05%) ⬆️
unittests2 35.10% <0.00%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

navina and others added 3 commits April 24, 2026 10:13
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…aware subclassing

Replace the hook-based design (mergeWithCachedDataTables, mergeWithCachedStreamingResponses)
with explicit doScatter/doReduce protected methods in both SingleConnectionBrokerRequestHandler
and GrpcBrokerRequestHandler. Subclasses can now own the full processBrokerRequest pipeline —
preScatter → doScatter → merge → doReduce — without OSS hooks or ThreadLocals.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…nstruction time

BrokerReduceService.reduceOnDataTable mutates the passed dataTableMap via
iterator.remove() as it processes entries. Computing numServersQueried and
numServersResponded lazily from _dataTableMap.size() after reduce returns 0.
Snapshot both counts eagerly at ScatterResult construction before the map
is passed to doReduce.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@navina navina changed the title feat(broker): add protected hooks for cache-aware interceptors in single-stage engine feat(broker): extract doScatter/doReduce hooks in single-stage broker request handlers Apr 24, 2026
Copy link
Copy Markdown
Contributor

@yashmayya yashmayya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @navina, looks pretty clean, I just have a couple of comments.

navina and others added 2 commits April 24, 2026 17:50
…accept ScatterResult

- Add ScatterResultStats inner class to snapshot live-server counts at
  scatter time, decoupling them from the data table map
- Replace ScatterResult constructor (was derived counts from map size) with
  one that accepts explicit ScatterResultStats, so subclasses can substitute
  the map without corrupting numServersQueried/numServersResponded
- doReduce now accepts ScatterResult directly (drop standalone dataTableMap
  param); call site uses scatterResult.getDataTableMap() internally
- Addresses reviewer comment on PR apache#18316

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ve cache language from OSS

- Remove dataTableToStreamingIterator from GrpcBrokerRequestHandler — it has
  no callers in OSS; only StarTree's GRPC handler uses it
- Delete GrpcBrokerRequestHandlerTest (only tested the removed method)
- Remove ByteString, Collections, DataTable imports that are no longer used
- Scrub cache-specific language from doScatter/doReduce Javadocs
- Addresses reviewer comment on PR apache#18316

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@navina navina requested review from xiangfu0 and yashmayya April 25, 2026 01:06
Comment on lines +236 to +240
private final long _totalResponseSize;
private final boolean _timedOut;
private final Exception _sendException;
private final int _numServersQueried;
private final int _numServersResponded;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not hold a ScatterResultStats object inside here instead?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the guidance was not to deviate too much from the existing abstractions and minimize the changes in the existing query handling path. that's probably why the code ended up like so.
I can make the change, if you insist.

@navina navina requested a review from yashmayya April 25, 2026 06:01
@yashmayya yashmayya merged commit 068907e into apache:master Apr 25, 2026
30 of 32 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

feature New functionality query Related to query processing

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants