diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java index d89e18f7ff31..8f7578c8ad4a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/DispatchablePlanVisitor.java @@ -140,7 +140,9 @@ public Void visitProject(ProjectNode node, DispatchablePlanContext context) { public Void visitSort(SortNode node, DispatchablePlanContext context) { node.getInputs().get(0).visit(this, context); DispatchablePlanMetadata dispatchablePlanMetadata = getOrCreateDispatchablePlanMetadata(node, context); - dispatchablePlanMetadata.setRequireSingleton(!node.getCollations().isEmpty() && node.getOffset() != -1); + // Final sort (receives from sort exchange) needs singleton worker + boolean isFinalSort = node.getInputs().get(0) instanceof MailboxReceiveNode; + dispatchablePlanMetadata.setRequireSingleton(isFinalSort); return null; } diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java index 6f204f9b7eec..71cea3923e0b 100644 --- a/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java +++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/QueryCompilationTest.java @@ -687,6 +687,45 @@ public void testLargeIn() { _queryEnvironment.planQuery(query); } + /** + * Tests that queries with ORDER BY / LIMIT use singleton worker for the intermediate sort stage. + */ + @Test + public void testSingletonWorkerForLimitAndOrderByQueries() { + String[] queries = + new String[]{"SELECT * FROM a LIMIT 10", "SELECT * FROM a OFFSET 10", "SELECT * FROM a ORDER BY col1", + "SELECT * FROM a LIMIT 10 OFFSET 5", "SELECT * FROM a ORDER BY col1 LIMIT 10", "SELECT * FROM a ORDER BY " + + "col1 OFFSET 10", "SELECT * FROM a ORDER BY col1 LIMIT 10 OFFSET 5"}; + + for (String query : queries) { + DispatchableSubPlan dispatchableSubPlan = _queryEnvironment.planQuery(query); + + // Find the intermediate stage (non-leaf, non-root) + DispatchablePlanFragment intermediateStage = findIntermediateStage(dispatchableSubPlan); + assertNotNull(intermediateStage, "Should have an intermediate stage"); + + // Should use singleton worker (1 server, 1 worker) + assertEquals(intermediateStage.getServerInstanceToWorkerIdMap().size(), 1, + "LIMIT / ORDER BY query should use singleton worker for intermediate stage"); + assertEquals(intermediateStage.getWorkerMetadataList().size(), 1, + "LIMIT / ORDER BY query should use singleton worker for intermediate stage"); + } + } + + /** + * Helper method to find an intermediate stage (non-leaf, non-root). + */ + private DispatchablePlanFragment findIntermediateStage(DispatchableSubPlan dispatchableSubPlan) { + for (DispatchablePlanFragment fragment : dispatchableSubPlan.getQueryStages()) { + int stageId = fragment.getPlanFragment().getFragmentId(); + // Skip stage 0 (broker/root) and leaf stages (have table names) + if (stageId > 0 && fragment.getTableName() == null) { + return fragment; + } + } + return null; + } + // -------------------------------------------------------------------------- // Test Utils. // --------------------------------------------------------------------------