From 24d4d5472874fae4604851f0a24d0307a9dda418 Mon Sep 17 00:00:00 2001 From: Kartik Khare Date: Mon, 1 Jun 2026 19:25:01 +0530 Subject: [PATCH] Add WorkerManager extension points for customizing leaf-stage segment assignment Subclasses can already customize multi-stage worker selection by overriding getCandidateServers / getCandidateServersForReplicatedLeaf, but the per-worker segment assignment is finalized internally with no extension point. This adds two protected no-op hooks invoked once the assignment is built: filterLeafStageSegments (from updateContextForLeafStage, covering the non-partitioned, partitioned and logical-table paths) and filterReplicatedLeafStageSegments (from setSegmentsForReplicatedLeafFragment). A subclass can rewrite DispatchablePlanMetadata's worker/replicated segment maps via the existing setters. Both default to no-ops, so behavior is unchanged. --- .../org/apache/pinot/query/routing/WorkerManager.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java index c658062f1ed2..d7b6383a3bf9 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java @@ -508,6 +508,7 @@ private void assignWorkersToLeafFragment(PlanFragment fragment, DispatchablePlan } private void updateContextForLeafStage(DispatchablePlanMetadata metadata, DispatchablePlanContext context) { + filterLeafStageSegments(context, metadata); if (context.isUseLeafServerForIntermediateStage()) { Map workerIdToServerInstanceMap = metadata.getWorkerIdToServerInstanceMap(); assert workerIdToServerInstanceMap != null; @@ -735,6 +736,15 @@ private void setSegmentsForReplicatedLeafFragment(DispatchablePlanMetadata metad // TODO: Support unavailable segments and optional segments for replicated leaf stage metadata.setReplicatedSegments(segmentsMap); + filterReplicatedLeafStageSegments(context, metadata); + } + + /** Extension point to filter the non-replicated leaf-stage per-worker segment assignment; no-op by default. */ + protected void filterLeafStageSegments(DispatchablePlanContext context, DispatchablePlanMetadata metadata) { + } + + /** Extension point to filter the replicated leaf-stage segments; no-op by default. */ + protected void filterReplicatedLeafStageSegments(DispatchablePlanContext context, DispatchablePlanMetadata metadata) { } /**