Skip to content

[multistage] New mode: push stage stats to the broker via long-lived plan-submission stream #18375

@gortiz

Description

@gortiz

Context

MSE today propagates per-operator stats by piggy-backing them onto the mailbox stream just before EOS. This has two well-known problems:

  • Partial stats on error. Senders pick one of N parallel downstream workers to send stats to (so the same stat isn't counted N times). When a query fails, the receiver aborts on the first error block, and any stats that were on a different path are dropped. The reported stats are non-deterministic across runs.
  • Timeout fragility. When stats can't reach the broker via the mailbox path, the broker falls back to the cancel RPC, which returns whatever stats are still parked on the server. Those stats are tied to the query context's lifetime and can be evicted before the broker fetches them.

A separate, long-standing pain point is that stats arrive at the broker as a flat inorder list of operator entries with no embedded operator → PlanNode mapping and no explicit tree shape. Today's InStageStatsTreeBuilder reconstructs the tree by leaning on each operator type's implicit arity. That works for transforms (1) and joins (2), but set operations are still N-ary at runtime (PlanNodeToOpChain.visitSetOp iterates setOpNode.getInputs() with no fixed arity), so the existing reconstruction is lossy whenever a set op has more than two inputs. The flat-binary, "merge bytes without deserializing" format was only worth its complexity because stats used to hop through many mailboxes downstream — with the new stream-mode design, server → broker is a single hop, so we can move to a more honest format that carries the tree explicitly.

This issue proposes a new mode where servers push stats — together with operator type, operator → PlanNode mapping, and the operator tree shape — directly to the broker over a long-lived gRPC stream. That eliminates both stats-loss problems and fixes the tree-reconstruction problem in one move. It also lets us route cancel through the same stream, fixing a related cancellation issue we have today. The legacy mailbox-piggybacked format is left unchanged: legacy mode keeps using MultiStageQueryStats's existing flat-binary serialization and InStageStatsTreeBuilder's existing heuristic.

Proposal

Add a new bidi RPC alongside today's unary Submit and Cancel, opt-in via config. When enabled, the broker keeps the stream open for the query lifetime; servers push one OpChainComplete per stage they ran on that server, and a final ServerDone. Stats no longer travel through the mailbox in this mode (servers send EOS without stats — already supported today, since SuccessMseBlock is stats-free).

Wire protocol

New RPC in pinot-common/src/main/proto/worker.proto:

rpc SubmitWithStream(stream BrokerToServer) returns (stream ServerToBroker);

message BrokerToServer {
  oneof payload {
    QueryRequest submit = 1;   // sent first — same shape as today
    CancelRequest cancel = 2;  // optional, supersedes the separate Cancel RPC
  }
}

message ServerToBroker {
  oneof payload {
    QueryResponse submit_ack = 1;  // replaces today's synchronous Submit response
    OpChainComplete opchain  = 2;  // one per stage that ran on this server
    ServerDone done          = 3;  // sent after all opchains have reported
  }
}

message OpChainComplete {
  int32 stage_id   = 1;
  int32 worker_id  = 2;
  bool success     = 3;
  string error_msg = 4;          // populated when success=false
  MultiStageStatsTree stats = 5; // structured, tree-shaped — see below
}

message ServerDone {}

// Multi-stage stats produced by one opchain. Carries the current stage as an
// explicit operator tree, plus any upstream-stage trees this opchain accumulated.
message MultiStageStatsTree {
  int32 current_stage_id = 1;
  StageStatsNode current_stage = 2;
  // Sparse map of upstream stage id -> tree.
  map<int32, StageStatsNode> upstream_stages = 3;
}

// One node of the operator tree for a single stage.
message StageStatsNode {
  // The MultiStageOperator.Type id, same byte we serialize today.
  int32 operator_type_id = 1;
  // Plan node ids that compile down to this operator. One-to-many; the leaf
  // operator typically owns the whole sub-tree of v1 plan nodes below the leaf
  // boundary.
  repeated int32 plan_node_ids = 2;
  // Opaque StatMap bytes — same encoding the existing StatMap.serialize produces.
  bytes stat_map = 3;
  // Children in left-to-right order. Arity is whatever the operator dictates
  // (1 for transforms, 2 for joins, N for set ops).
  repeated StageStatsNode children = 4;
}

Why bidi vs server-streaming: bidi lets cancel ride the same stream and turns broker-initiated stream close into an implicit cancel signal — no second RPC needed.

Server side (QueryServer, new handler for SubmitWithStream)

  1. First inbound message must be BrokerToServer.submit. The handler runs the same plan deserialization → opchain construction logic that Submit runs today (_submissionExecutorService).
  2. Synchronous submit_ack is pushed back on the stream — replaces today's unary QueryResponse. Carries early errors (plan parsing, malformed metadata).
  3. Each opchain is wired up with a completion callback. When an opchain finishes (success or failure), the callback projects its MultiStageQueryStats onto the live MultiStageOperator tree to build a MultiStageStatsTree (see "Stats payload format & merging" below) and pushes an OpChainComplete on the stream.
  4. After the last opchain on this server has reported, push ServerDone and half-close the server side.
  5. If the broker sends BrokerToServer.cancel (or closes the stream), call into QueryRunner.cancel(requestId) — same path as today's separate Cancel RPC.

Server threading model

Thread / pool Responsibility
gRPC server I/O thread Receives BrokerToServer messages; routes inbound submit / cancel; never blocks.
_submissionExecutorService Plan deserialization → opchain construction (unchanged from today).
Per-opchain operator threads Run the operator pipeline; on completion invoke the completion callback.
Opchain completion thread (any worker thread) — serialized via per-query lock When an opchain finishes it acquires the per-query lock, builds the OpChainComplete proto, calls StreamObserver.onNext, and releases. gRPC requires onNext to be called serially; a per-query ReentrantLock (or synchronized on a per-query state object) gives us that without spawning a dedicated executor. No new thread / pool is introduced.
gRPC cancel handler thread When the broker closes the stream or sends BrokerToServer.cancel, this fires on a gRPC thread; it calls QueryRunner.cancel(requestId) and lets opchains tear down on their own threads.

Broker side (dispatcher)

  1. When the new mode is enabled for a query, the dispatcher opens one SubmitWithStream per server instead of unary Submit.

  2. Per-query state tracks: outstanding (stageId, workerId) opchains expected to report, a Map<Integer, StageStatsNode> accumulator (per-stage merged tree), and the set of open server streams.

  3. As OpChainComplete arrives, decrement the outstanding set and merge each StageStatsNode from the payload into the accumulator (per-node tree-shape check + StatMap.merge). On tree-shape mismatch, mark mergeFailed for that stage and drop the offending payload.

  4. On any server reporting an error (either an OpChainComplete{success=false} or a stream onError), the broker sends BrokerToServer.cancel on every other open stream for that query. This actively shuts down peer servers rather than waiting for them to time out.

  5. When the broker's data-side mailbox finishes (success EOS or error EOS), it enters a wait window for any outstanding OpChainComplete messages. Default 50 ms, hard-bounded by the query's remaining timeout. On error, we still wait — the whole point of this mode is that error-path stats are reliable.

  6. If the wait window expires with anything outstanding, the broker emits a per-stage "stats coverage" structure rather than just a boolean partial flag:

    stageCoverage[i] = { responded: 6, mergeFailed: 1, missing: 2 }
    

    index i is stage id. mergeFailed covers payloads that arrived but the broker couldn't combine (version mismatch).

Broker threading model

Thread / pool Responsibility
Dispatch thread Opens N SubmitWithStream streams (one per server); registers StreamObserver callbacks; returns immediately.
gRPC client I/O thread (inbound OpChainComplete) — serialized via per-query lock The per-query state holds the per-stage Map<Integer, StageStatsNode> accumulator, the outstanding-opchain set, the open-streams set, and the per-stage mergeFailed counter, guarded by a per-query lock. The gRPC client onNext callback acquires the lock, decodes the payload, merges per-stage trees (tree-shape check + StatMap.merge), decrements outstanding, and releases. Work is short and non-blocking, so doing it on the I/O thread is fine; no new executor needed.
Mailbox receiver thread (existing) Drains the data mailbox; on EOS notifies the wait-window scheduler.
Wait-window scheduler (single shared ScheduledExecutorService for the broker process) After the data mailbox EOS, schedules a "deadline reached" task at min(50ms, remainingTimeout). When the task fires it acquires the per-query lock, finalizes the stage-coverage structure, and resolves the query future. One shared scheduler for the whole broker — not per-query.
Cancel-broadcast path When any server reports an error, the gRPC I/O thread (already holding the per-query lock from the inbound message handling) walks the open-streams set and calls cancel onNext on each. Stream send is non-blocking.

Stats payload format & merging

The new MultiStageStatsTree payload carries the operator tree shape, type, plan-node ids, and an opaque StatMap byte payload per node. The internal merge contract used by MultiStageQueryStats today must keep working for stream-mode payloads:

  • In-memory representation. We keep MultiStageQueryStats and StageStats as the canonical broker-side accumulator types — operator code (which adds entries via Open.addLastOperator(type, statMap)) stays unchanged. We add a tree-shaped projection layer used only at the wire boundary.
  • Encoding (server side). At opchain-completion time we walk the live MultiStageOperator tree in inorder. The i-th visit pulls the i-th entry from StageStats._operatorTypes / _operatorStats. We build a StageStatsNode carrying the type, the StatMap bytes (StatMap.serialize), and the children, attaching the planNodeIds gathered from the tracker.
  • Decoding & merge (broker side). Each incoming OpChainComplete is converted back into a StageStats.Closed per stage by a recursive walk; the broker keeps the per-stage tree in a Map<Integer, StageStatsNode> accumulator. Two payloads for the same stage merge by tree-shape equality + per-node StatMap.merge (the existing API). Tree-shape mismatch (operator type at any position differs, or arity differs) → log + mark mergeFailed for that stage, drop the second payload, never abort the query.
  • One-hop world. Because stream mode is server → broker once, we don't need the legacy "merge bytes without deserializing" optimization. The encoder/decoder above are simple and explicit.

Op → PlanNode mapping

PlanNodeToOpChain already invokes a BiConsumer<PlanNode, MultiStageOperator> tracker as each operator is constructed. Today the mapping is discarded after construction. Capture it into a Map<MultiStageOperator, List<PlanNode>> on OpChainExecutionContext. For the leaf operator the tracker fires once with the leaf-stage boundary node, but the leaf actually represents the whole sub-tree of v1 plan nodes below that boundary — at construction time we walk that sub-tree once and record all PlanNodeIds for the leaf. At MultiStageStatsTree build time, each StageStatsNode reads its planNodeIds from this map. PlanNode.getNodeId() is the stable wire identifier. Cardinality is one-to-many (leaf → many; intermediate operators → one).

Forward-looking constraint: stats must be concurrent-read safe

In this design stats are still only read by the broker after the opchain finishes, so today's "single writer, no sync" property is preserved. However, a future extension is for the broker to poll stats during query execution. To keep the door open, MSE stats objects should be designed (or refactored) so that they can be read concurrently with operator-thread writes — e.g. counters as volatile/AtomicLong or a snapshot-on-demand publication discipline. This issue does not implement live polling, but it should not introduce any change that prevents it.

Configuration

Two-level opt-in:

  • Cluster level: a config key (e.g. pinot.multistage.engine.statsReportingMode) with values legacy / stream.
  • Per-query: a query option that overrides the cluster setting.
  • Conflict resolution: query-level wins.

Default for the first release: legacy. Flip to stream once it has bake time.

Backward compatibility & mixed-version handling

  • The existing unary Submit and Cancel RPCs stay in place and unchanged.
  • Mode legacy is byte-for-byte the existing behavior.
  • Mode stream requires both broker and server to support the new RPC. The broker probes by attempting SubmitWithStream; if the server returns UNIMPLEMENTED, the dispatcher falls back to legacy for that query.
  • When mode is stream, the server emits EOS without stats on the mailbox.

Replacing the "skip stats during upgrade" mode

Today there is a mode in which servers do not send stats at all, used during rolling upgrades because servers on different versions may produce stat objects the broker can't merge. In stream mode this concern moves entirely to the broker:

  • Servers always send stats. The "skip stats" toggle is not used in stream mode.
  • The broker tolerates version discrepancies. If the broker receives a MultiStageStatsTree it cannot decode (unknown operator type id, malformed StatMap bytes) or cannot merge with what it has already accumulated for that stage (tree shape mismatch — operator type at any position differs, or children count differs):
    1. Logs the failure with stage id, worker id, and the operator-type-id sequence of both trees.
    2. Marks that opchain as "responded but not merged" in the per-stage coverage structure.
    3. Continues processing other OpChainComplete messages — never aborts the query because of a stats merge problem.
  • The proto's natural extensibility (unknown fields are tolerated, new optional fields can be added) means most cross-version evolutions are non-breaking; only operator-type-id additions or StatMap schema changes need this safety net.

Critical files

  • pinot-common/src/main/proto/worker.proto — new RPC + messages
  • pinot-query-runtime/.../service/server/QueryServer.javaSubmitWithStream handler, opchain completion callback, cancel-via-stream
  • pinot-query-runtime/.../runtime/plan/PlanNodeToOpChain.java — populate the OpChainExecutionContext op→PlanNode map; walk the leaf sub-tree to record the leaf's full plan-node list
  • pinot-query-runtime/.../runtime/plan/OpChainExecutionContext.java — store the op→PlanNode map
  • pinot-query-runtime/.../runtime/operator/MultiStageOperator.java — expose the children list shape used by the inorder walk at encoding time
  • pinot-query-runtime/.../runtime/plan/MultiStageStatsTreeEncoder.java (new) — walks live operator tree + MultiStageQueryStats flat lists to build a MultiStageStatsTree proto; opposite direction MultiStageStatsTreeDecoder produces a per-stage StageStats.Closed for the broker accumulator
  • pinot-query-runtime/.../runtime/blocks/MseBlock.java — confirm EOS-without-stats path is reachable when mode=stream
  • Broker dispatcher (pinot-broker MSE dispatch path) — open the new stream, accumulate OpChainComplete, wait window, partial-coverage report
  • MultiStageQueryStats — add a tree-aware merge entrypoint plus per-stage coverage structure (responded / mergeFailed / missing); legacy serialization path stays untouched
  • pinot-query-runtime/.../runtime/InStageStatsTreeBuilder.javaunchanged. Continues to be used by legacy mode. A separate consumer reads the structured tree directly when stream mode is on.
  • Config: CommonConstants and the place that resolves cluster + query-option config

Phasing

  1. Phase A — SubmitWithStream RPC + structured MultiStageStatsTree payload. New bidi RPC alongside the existing Submit / Cancel. Server-side encoder + opchain completion callback. Broker-side per-stage tree accumulator, wait window, fan-out cancel, mergeFailed accounting. Cancel still routes through today's separate Cancel RPC. Cluster + per-query flag.

  2. Phase B — Fold cancel onto the stream. Broker can send BrokerToServer.cancel; broker-initiated stream close also acts as cancel. Decommission the separate Cancel unary RPC after one release of overlap.

Each phase is independently shippable and feature-flagged. The legacy mailbox-piggyback path is left untouched throughout.

Suggested commit / PR boundaries within a single feature branch:

  • A.0: proto definitions (SubmitWithStream, BrokerToServer, ServerToBroker, OpChainComplete, MultiStageStatsTree, StageStatsNode) + generated code, no behavior changes.
  • A.1: capture op→PlanNode map on OpChainExecutionContext; leaf sub-tree walk; MultiStageStatsTreeEncoder / MultiStageStatsTreeDecoder + mergeFailed semantics + unit tests.
  • A.2: server-side SubmitWithStream handler in QueryServer, opchain completion callback, EOS-without-stats when stream mode is on.
  • A.3: broker-side dispatcher uses the new RPC under the cluster/per-query flag, threading model, wait window, fan-out cancel.
  • B: cancel via the stream (broker-initiated message + close-as-cancel), keep the legacy Cancel RPC for one release.

Backout discipline. Each phase is feature-flagged and the underlying RPCs / payloads are additive. A regression in Phase A can be turned off via the cluster config (statsReportingMode = legacy) without rolling back code; Phase B reverts cleanly because the legacy Cancel RPC is kept alongside.

Verification

  • Unit tests for the new encoder/decoder:
    • MultiStageStatsTreeEncoder round-trip: build a MultiStageQueryStats for a representative tree (transforms, joins, N-ary set op with N≥3 children, leaf operator with a multi-PlanNode sub-tree), encode, decode, assert equality of types / planNodeIds / StatMap contents.
    • Merge of two equivalent payloads → counters summed, planNodeIds preserved.
    • Merge of two payloads with mismatched tree shape → mergeFailed flag set, query continues.
  • Integration tests in pinot-integration-tests:
    • Successful query with mode=stream: broker sees full stats with non-empty op → PlanNode mapping; tree shape matches the dispatched plan exactly.
    • Query with an N-ary set op: regression coverage that today's InStageStatsTreeBuilder heuristic loses, now correct under stream mode.
    • Query with intentional server-side error on one server: error stats arrive on the new stream; broker fans out BrokerToServer.cancel to all peer servers (Phase B); broker reports complete coverage including the cancelled servers' partial stats.
    • Query timeout with one stage stuck: wait window expires; broker reports per-stage coverage with missing>0.
    • Cluster=legacy, query-option=stream: per-query override works (and vice versa).
    • Mixed-version: new broker, old server → broker falls back to unary Submit/Cancel cleanly.
    • Mixed-version stats merge: a MultiStageStatsTree containing an unknown operator_type_id from a newer server arrives at an older broker; broker logs, marks mergeFailed, query completes successfully.
  • Run a representative MSE benchmark in legacy and stream modes; confirm no regression in the success path.

Alternatives considered

The original design doc enumerated four candidate solutions:

  • S1 — Periodic stats blocks (broker always knows current stats; rejected as larger change with no clear win when the broker can simply pull on demand).
  • S2 — Cache on any error (smaller change, partly implemented in Use stats cache on error instead of the chained mechanism #15992; keeps the server cache but uses it more reliably; rejected because it doesn't solve the tree-reconstruction problem and keeps the cache memory cost).
  • S3 — Workers push stats directly to broker (the family this issue belongs to).
  • S4 — Grace period on errors (minimal code, but latency on error scales with chain depth and still loses stats sometimes; rejected).

This issue is an evolution of S3: instead of a brand-new RPC dedicated to stats, evolve the existing plan-submission RPC into a long-lived bidi stream that carries plan submission, stats push, and (later) cancel.

A future extension — broker-initiated polling for live progress during query execution — is left out of scope for this issue but the design preserves the room to add it later (see "Forward-looking constraint" above).


Previous version (superseded)

The first revision of this proposal had a 3-phase plan and reused today's flat-binary stats payload format:

  • Phase 1 (dropped) would have shipped the operator → PlanNode mapping inside today's per-stage MultiStageQueryStats byte payload, additively, and extended InStageStatsTreeBuilder to consume it when present. The thinking was that legacy-mode queries would benefit from a tree-reconstruction fix immediately, in advance of the new RPC.
  • Phase 2 was the SubmitWithStream RPC, but OpChainComplete.stats was specified as opaque bytes (the existing StageStats.serialize format) plus a sibling repeated OpPlanNodeMapping mapping field on OpChainComplete. Stats were going to be smuggled through the same flat-binary encoding the legacy mailbox path uses.

It was revised after observing two things:

  1. Server → broker is a single hop in stream mode. The "merge stats bytes without deserializing" optimization that drove the flat-binary format earned its complexity back when stats were copied through many mailboxes downstream — that's no longer the case here.
  2. Set operations are still N-ary at runtime (PlanNodeToOpChain.visitSetOp iterates setOpNode.getInputs() with no fixed arity). The existing inorder + implicit-arity scheme can't uniquely reconstruct the tree for set ops with 3+ inputs, and adding the mapping field alone doesn't fix that — we also need explicit tree shape on the wire.

The revised proposal above replaces the opaque-bytes-plus-mapping approach with a structured MultiStageStatsTree proto message that carries the tree shape, types, plan-node ids, and per-node StatMap bytes natively. Phase 1 was dropped because smuggling tree shape into the legacy format is throwaway work once the new format exists; legacy mode keeps using its existing format and InStageStatsTreeBuilder heuristic unchanged.

Metadata

Metadata

Assignees

Labels

PEP-RequestPinot Enhancement Proposal request to be reviewed.multi-stageRelated to the multi-stage query engine

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions