diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java index 88403083c54e39..54fe0dff977f59 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ResultReceiverConsumer.java @@ -82,15 +82,18 @@ public ResultReceiverConsumer(List resultReceivers, long timeout ReceiverContext context = new ReceiverContext(resultReceivers.get(i), i); contexts.add(context); } - this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size()); + this.readyOffsets = new ArrayBlockingQueue<>(Math.max(1, resultReceivers.size())); timeoutTs = timeoutDeadline; } public boolean isEos() { - return finishedReceivers == contexts.size(); + return !contexts.isEmpty() && finishedReceivers == contexts.size(); } public RowBatch getNext(Status status) throws TException, InterruptedException, ExecutionException, UserException { + if (contexts.isEmpty()) { + throw new UserException("There is no receiver."); + } if (!futureInitialized) { futureInitialized = true; for (ReceiverContext context : contexts) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java index 4ab7f041f591f3..88ceb432c2eb39 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/QueryProcessor.java @@ -86,10 +86,14 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext) { } boolean regenerateInstanceId = coordinatorContext.connectContext.consumeNeedRegenerateQueryId(); + boolean returnResultFromLocal = coordinatorContext.connectContext.isReturnResultFromLocal(); for (AssignedJob topInstance : distinctWorkerJobs.values()) { if (regenerateInstanceId) { topInstance.resetInstanceId(coordinatorContext.connectContext.nextInstanceId()); } + if (!returnResultFromLocal) { + continue; + } DistributedPlanWorker topWorker = topInstance.getAssignedWorker(); TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(), topWorker.brpcPort()); receivers.add( diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java index e67069209a0bea..3f76d4227b0614 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ResultReceiverConsumerTest.java @@ -40,6 +40,16 @@ public class ResultReceiverConsumerTest { @Injectable private ResultReceiver receiver3; + @Test + public void testEmptyReceiversForRemoteResult() throws Exception { + ResultReceiverConsumer consumer = new ResultReceiverConsumer(Lists.newArrayList(), + System.currentTimeMillis() + 3600); + Status status = new Status(); + + Assert.assertFalse(consumer.isEos()); + Assertions.assertThrows(UserException.class, () -> consumer.getNext(status)); + } + @Test public void testEosHandling() throws Exception { ResultReceiverConsumer consumer = new ResultReceiverConsumer(