Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,18 @@ public ResultReceiverConsumer(List<ResultReceiver> resultReceivers, long timeout
ReceiverContext context = new ReceiverContext(resultReceivers.get(i), i);
contexts.add(context);
}
this.readyOffsets = new ArrayBlockingQueue<>(resultReceivers.size());
this.readyOffsets = new ArrayBlockingQueue<>(Math.max(1, resultReceivers.size()));
timeoutTs = timeoutDeadline;
}

public boolean isEos() {
return finishedReceivers == contexts.size();
return !contexts.isEmpty() && finishedReceivers == contexts.size();
}

public RowBatch getNext(Status status) throws TException, InterruptedException, ExecutionException, UserException {
if (contexts.isEmpty()) {
throw new UserException("There is no receiver.");
}
if (!futureInitialized) {
futureInitialized = true;
for (ReceiverContext context : contexts) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,14 @@ public static QueryProcessor build(CoordinatorContext coordinatorContext) {
}

boolean regenerateInstanceId = coordinatorContext.connectContext.consumeNeedRegenerateQueryId();
boolean returnResultFromLocal = coordinatorContext.connectContext.isReturnResultFromLocal();
for (AssignedJob topInstance : distinctWorkerJobs.values()) {
if (regenerateInstanceId) {
topInstance.resetInstanceId(coordinatorContext.connectContext.nextInstanceId());
}
if (!returnResultFromLocal) {
continue;
}
DistributedPlanWorker topWorker = topInstance.getAssignedWorker();
TNetworkAddress execBeAddr = new TNetworkAddress(topWorker.host(), topWorker.brpcPort());
receivers.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,16 @@ public class ResultReceiverConsumerTest {
@Injectable
private ResultReceiver receiver3;

@Test
public void testEmptyReceiversForRemoteResult() throws Exception {
ResultReceiverConsumer consumer = new ResultReceiverConsumer(Lists.newArrayList(),
System.currentTimeMillis() + 3600);
Status status = new Status();

Assert.assertFalse(consumer.isEos());
Assertions.assertThrows(UserException.class, () -> consumer.getNext(status));
}

@Test
public void testEosHandling() throws Exception {
ResultReceiverConsumer consumer = new ResultReceiverConsumer(
Expand Down
Loading