diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index e8bab076e1d01..00443a553da94 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -244,21 +244,24 @@ public void execute( ) ) { runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute()); - } - for (PhysicalPlan subplan : subplans) { - var childSessionId = newChildSession(sessionId); - ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize()); - // funnel sub plan pages into the main plan exchange source - mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); - executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> { - exchangeSink.addCompletionListener( - ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) - ); - }, e -> { - exchangeService.finishSinkHandler(childSessionId, e); - finalListener.onFailure(e); - }), () -> exchangeSink.createExchangeSink(() -> {})); + for (PhysicalPlan subplan : subplans) { + var childSessionId = newChildSession(sessionId); + ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize()); + // funnel sub plan pages into the main plan exchange source + mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); + var subPlanListener = localListener.acquireCompute(); + + executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> { + exchangeSink.addCompletionListener( + ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) + ); + subPlanListener.onResponse(result.completionInfo()); + }, e -> { + exchangeService.finishSinkHandler(childSessionId, e); + subPlanListener.onFailure(e); + }), () -> exchangeSink.createExchangeSink(() -> {})); + } } } }