Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
c61ebba
cherry pick
Cpaulyz Feb 24, 2025
786f907
update example
Cpaulyz Feb 24, 2025
c036ed2
fix Source handle is blocked problem in MergeSortJoinOperator
Beyyes Feb 25, 2025
689c1ce
fix exchange UT framework
Cpaulyz Feb 26, 2025
d3e2ab5
fix UT
Cpaulyz Feb 26, 2025
3eb8929
add more logs for operator
Beyyes Mar 5, 2025
28aa5ab
Merge branch 'master' of github.com:apache/iotdb
Beyyes Mar 7, 2025
6f01cba
Merge branch 'master' of github.com:apache/iotdb
Beyyes Mar 10, 2025
816932b
Merge branch 'master' into parallel_multi_child_node
Beyyes Mar 10, 2025
5aded39
Merge remote-tracking branch 'origin/master' into parallel_multi_chil…
JackieTien97 Mar 10, 2025
2ff8357
Fix SortTest UT
JackieTien97 Mar 10, 2025
839b886
Merge branch 'parallel_multi_child_node' of github.com:apache/iotdb i…
Beyyes Mar 10, 2025
831dce5
fix join test and subquery test
Beyyes Mar 10, 2025
d2d9c49
Fix AnalyzerTest and LimitOffsetPushDownTest
Wei-hao-Li Mar 11, 2025
e074834
Merge branch 'master' of github.com:apache/iotdb
Beyyes Mar 11, 2025
edcd997
Merge branch 'master' into parallel_multi_child_node
Beyyes Mar 11, 2025
195b8c8
fix subquery ut
Beyyes Mar 11, 2025
2e9a795
Merge branch 'parallel_multi_child_node' of github.com:apache/iotdb i…
Beyyes Mar 11, 2025
a442fea
try fix IT
Cpaulyz Mar 11, 2025
4a02d39
fix subquery it
Beyyes Mar 11, 2025
f9e90c2
Merge branch 'parallel_multi_child_node' of github.com:apache/iotdb i…
Beyyes Mar 11, 2025
27624c0
Resolve conflict
Cpaulyz Mar 12, 2025
481a896
fix UT
Cpaulyz Mar 12, 2025
acd6628
Merge remote-tracking branch 'origin/master' into parallel_multi_chil…
JackieTien97 Mar 12, 2025
5e693f7
Try Fix Exception Unstable
JackieTien97 Mar 12, 2025
cb37f57
Try fix CI
JackieTien97 Mar 13, 2025
246482d
Try fix CI
JackieTien97 Mar 13, 2025
58a82af
Try fix CI
JackieTien97 Mar 13, 2025
4fa5b2f
Merge remote-tracking branch 'origin/master' into parallel_multi_chil…
JackieTien97 Mar 13, 2025
594cfe8
Try fix CI
JackieTien97 Mar 13, 2025
0744324
Try fix CI
JackieTien97 Mar 13, 2025
d53900a
Try fix CI
JackieTien97 Mar 13, 2025
f2d8342
Try fix CI
JackieTien97 Mar 13, 2025
7d15b42
Try fix CI
JackieTien97 Mar 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.db.queryengine.execution.driver;

import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink;
Expand Down Expand Up @@ -244,13 +245,17 @@ private ListenableFuture<?> processInternal() {
}
return NOT_BLOCKED;
} catch (Throwable t) {
Throwable actualCause = t;
if (actualCause.getCause() instanceof IoTDBRuntimeException) {
actualCause = actualCause.getCause();
}
List<StackTraceElement> interrupterStack = exclusiveLock.getInterrupterStack();
if (interrupterStack == null) {
driverContext.failed(t);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
driverContext.failed(actualCause);
if (actualCause instanceof RuntimeException) {
throw (RuntimeException) actualCause;
} else {
throw new RuntimeException(t);
throw new RuntimeException(actualCause);
}
}

Expand All @@ -260,7 +265,7 @@ private ListenableFuture<?> processInternal() {
Exception exception = new Exception("Interrupted By");
exception.setStackTrace(interrupterStack.toArray(new StackTraceElement[0]));
RuntimeException newException = new RuntimeException("Driver was interrupted", exception);
newException.addSuppressed(t);
newException.addSuppressed(actualCause);
driverContext.failed(newException);
throw newException;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import javax.annotation.concurrent.NotThreadSafe;

import java.util.LinkedList;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -82,6 +83,8 @@ public class SharedTsBlockQueue {
private long maxBytesCanReserve =
IoTDBDescriptor.getInstance().getMemoryConfig().getMaxBytesPerFragmentInstance();

private volatile Throwable abortedCause = null;

// used for SharedTsBlockQueue listener
private final ExecutorService executorService;

Expand Down Expand Up @@ -179,6 +182,9 @@ public void setNoMoreTsBlocks(boolean noMoreTsBlocks) {
public TsBlock remove() {
if (closed) {
// try throw underlying exception instead of "Source handle is aborted."
if (abortedCause != null) {
throw new IllegalStateException(abortedCause);
}
try {
blocked.get();
} catch (InterruptedException e) {
Expand Down Expand Up @@ -342,6 +348,7 @@ public void abort(Throwable t) {
if (closed) {
return;
}
abortedCause = t;
closed = true;
if (!blocked.isDone()) {
blocked.setException(t);
Expand All @@ -364,4 +371,8 @@ public void abort(Throwable t) {
bufferRetainedSizeInBytes = 0;
}
}

public Optional<Throwable> getAbortedCause() {
return Optional.ofNullable(abortedCause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ public interface ISinkChannel extends ISink {

/** Return the number of TsBlocks the channel has in buffer. */
int getNumOfBufferedTsBlocks();

void checkState();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
Expand Down Expand Up @@ -231,9 +232,25 @@ public SharedTsBlockQueue getSharedTsBlockQueue() {
return queue;
}

private void checkState() {
@Override
public void checkState() {
if (aborted) {
throw new IllegalStateException("LocalSinkChannel is aborted.");
Optional<Throwable> abortedCause = queue.getAbortedCause();
if (abortedCause.isPresent()) {
throw new IllegalStateException(abortedCause.get());
}
if (queue.isBlocked().isDone()) {
// try throw underlying exception
try {
queue.isBlocked().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
} catch (ExecutionException e) {
throw new IllegalStateException(e.getCause() == null ? e : e.getCause());
}
}
throw new IllegalStateException("LocalSinkChannel is ABORTED.");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,9 @@ public long ramBytesUsed() {

private void checkState() {
if (aborted) {
for (ISinkChannel channel : downStreamChannelList) {
channel.checkState();
}
throw new IllegalStateException("ShuffleSinkHandle is aborted.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ public String toString() {
localFragmentInstanceId.instanceId);
}

private void checkState() {
@Override
public void checkState() {
if (aborted) {
throw new IllegalStateException("SinkChannel is aborted.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
Expand Down Expand Up @@ -252,7 +253,11 @@ public void close() {
}

private void checkState() {
if (aborted) {
if (aborted || closed) {
Optional<Throwable> abortedCause = queue.getAbortedCause();
if (abortedCause.isPresent()) {
throw new IllegalStateException(abortedCause.get());
}
if (queue.isBlocked().isDone()) {
// try throw underlying exception instead of "Source handle is aborted."
try {
Expand All @@ -264,9 +269,8 @@ private void checkState() {
throw new IllegalStateException(e.getCause() == null ? e : e.getCause());
}
}
throw new IllegalStateException("Source handle is aborted.");
} else if (closed) {
throw new IllegalStateException("Source Handle is closed.");
throw new IllegalStateException(
"LocalSinkChannel state is ." + (aborted ? "ABORTED" : "CLOSED"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
Expand All @@ -66,6 +67,8 @@

import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.getRootCause;
import static org.apache.iotdb.rpc.TSStatusCode.DATE_OUT_OF_RANGE;

public class FragmentInstanceContext extends QueryContext {

Expand Down Expand Up @@ -322,23 +325,6 @@ public List<FragmentInstanceFailureInfo> getFailureInfoList() {
.collect(Collectors.toList());
}

public Optional<TSStatus> getErrorCode() {
return stateMachine.getFailureCauses().stream()
.filter(e -> e instanceof IoTDBException || e instanceof IoTDBRuntimeException)
.findFirst()
.flatMap(
t -> {
TSStatus status;
if (t instanceof IoTDBException) {
status = new TSStatus(((IoTDBException) t).getErrorCode());
} else {
status = new TSStatus(((IoTDBRuntimeException) t).getErrorCode());
}
status.setMessage(t.getMessage());
return Optional.of(status);
});
}

public void finished() {
stateMachine.finished();
}
Expand Down Expand Up @@ -384,7 +370,8 @@ public FragmentInstanceInfo getInstanceInfo() {
List<FragmentInstanceFailureInfo> failureInfoList = new ArrayList<>();
TSStatus status = null;

for (Throwable failure : failures) {
for (Throwable t : failures) {
Throwable failure = getRootCause(t);
if (failureCause.isEmpty()) {
failureCause = failure.getMessage();
}
Expand All @@ -395,6 +382,11 @@ public FragmentInstanceInfo getInstanceInfo() {
} else if (failure instanceof IoTDBRuntimeException) {
status = new TSStatus(((IoTDBRuntimeException) failure).getErrorCode());
status.setMessage(failure.getMessage());
} else if (failure instanceof DateTimeParseException) {
status = new TSStatus(DATE_OUT_OF_RANGE.getStatusCode());
status.setMessage(failure.getMessage());
} else {
LOGGER.warn("[Unknown exception]: ", failure);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,7 @@ public FragmentInstanceState getInstanceState() {
}

public FragmentInstanceInfo getInstanceInfo() {
return context
.getErrorCode()
.map(
s ->
new FragmentInstanceInfo(
stateMachine.getState(),
context.getEndTime(),
context.getFailedCause(),
context.getFailureInfoList(),
s))
.orElseGet(
() ->
new FragmentInstanceInfo(
stateMachine.getState(),
context.getEndTime(),
context.getFailedCause(),
context.getFailureInfoList()));
return context.getInstanceInfo();
}

public long getStartTime() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.iotdb.db.queryengine.execution.fragment;

import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
Expand Down Expand Up @@ -52,7 +51,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -190,7 +188,6 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
instance.isExplainAnalyze(),
exchangeManager);
} catch (Throwable t) {
clearFIRelatedResources(instanceId);
// deal with
if (t instanceof IllegalStateException
&& TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
Expand All @@ -205,6 +202,7 @@ public FragmentInstanceInfo execDataQueryFragmentInstance(
logger.warn("error when create FragmentInstanceExecution.", t);
stateMachine.failed(t);
}
clearFIRelatedResources(instanceId);
return null;
}
});
Expand Down Expand Up @@ -287,7 +285,6 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance(
false,
exchangeManager);
} catch (Throwable t) {
clearFIRelatedResources(instanceId);
// deal with
if (t instanceof IllegalStateException
&& TOO_MANY_CONCURRENT_QUERIES_ERROR_MSG.equals(t.getMessage())) {
Expand All @@ -302,6 +299,7 @@ public FragmentInstanceInfo execSchemaQueryFragmentInstance(
logger.warn("Execute error caused by ", t);
stateMachine.failed(t);
}
clearFIRelatedResources(instanceId);
return null;
}
});
Expand Down Expand Up @@ -391,23 +389,7 @@ public TFetchFragmentInstanceStatisticsResp getFragmentInstanceStatistics(

private FragmentInstanceInfo createFailedInstanceInfo(FragmentInstanceId instanceId) {
FragmentInstanceContext context = instanceContext.get(instanceId);
Optional<TSStatus> errorCode = context.getErrorCode();
return errorCode
.map(
tsStatus ->
new FragmentInstanceInfo(
FragmentInstanceState.FAILED,
context.getEndTime(),
context.getFailedCause(),
context.getFailureInfoList(),
tsStatus))
.orElseGet(
() ->
new FragmentInstanceInfo(
FragmentInstanceState.FAILED,
context.getEndTime(),
context.getFailedCause(),
context.getFailureInfoList()));
return context.getInstanceInfo();
}

private void removeOldInstances() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class CollectOperator implements ProcessOperator {

private final OperatorContext operatorContext;
private final List<Operator> children;
private boolean inited = false;

private int currentIndex;

Expand Down Expand Up @@ -68,6 +69,12 @@ private void closeCurrentChild(int index) throws Exception {

@Override
public ListenableFuture<?> isBlocked() {
if (!inited) {
inited = true;
for (Operator child : children) {
child.isBlocked();
}
}
if (currentIndex >= children.size()) {
return NOT_BLOCKED;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@ protected void gotCandidateBlocks() throws Exception {
if (cachedNextRightBlock != null) {
addRightBlockWithMemoryReservation(cachedNextRightBlock);
cachedNextRightBlock = null;
tryCacheNextRightTsBlock();
if (rightChild.isBlocked().isDone()) {
tryCacheNextRightTsBlock();
}
} else {
if (rightChild.hasNextWithTimer()) {
TsBlock block = rightChild.nextWithTimer();
Expand Down
Loading
Loading