From 5498905e72ae5d9334e596d492eb8986c7a92602 Mon Sep 17 00:00:00 2001 From: "ievgen.degtiarenko" Date: Thu, 15 May 2025 08:57:44 +0200 Subject: [PATCH] Simplify exchange release logic --- .../xpack/esql/plugin/DataNodeComputeHandler.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 2c1677e012078..e423cd43f8f9b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -248,9 +248,6 @@ private class DataNodeRequestExecutor { } void start() { - parentTask.addListener( - () -> exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(parentTask.getReasonCancelled())) - ); runBatch(0); } @@ -419,7 +416,12 @@ private void runComputeOnDataNode( var parentListener = computeListener.acquireAvoid(); try { // run compute with target shards + var externalSink = exchangeService.getSinkHandler(externalId); var internalSink = exchangeService.createSinkHandler(request.sessionId(), request.pragmas().exchangeBufferSize()); + task.addListener(() -> { + exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())); + exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(task.getReasonCancelled())); + }); DataNodeRequestExecutor dataNodeRequestExecutor = new DataNodeRequestExecutor( request, task, @@ -431,10 +433,6 @@ private void runComputeOnDataNode( ); dataNodeRequestExecutor.start(); // run the node-level reduction - var externalSink = exchangeService.getSinkHandler(externalId); - task.addListener( - () -> exchangeService.finishSinkHandler(externalId, new TaskCancelledException(task.getReasonCancelled())) - ); var exchangeSource = new ExchangeSourceHandler(1, esqlExecutor); exchangeSource.addRemoteSink(internalSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); var reductionListener = computeListener.acquireCompute();