Skip to content

Commit

Permalink
[bugfix](wg)refactor query queue for robustness (apache#37642)
Browse files Browse the repository at this point in the history
## Proposed changes
To simplify QueryQueue's code for robustness, redefine QueryQueue's
usage in two steps:
1 use QueryQueue.getToken to get token, then token state maybe running
or queued;
2 release QueryToken when coordinator.close,decrement runningQueryNum or
remove it from waiting queue;
We just need to keep this two step is atomic,then whether QueueToken.get
is succ or exception is not important.
So this PR remove ```removeToken``` method and just release QueueToken
in ```coord.close```.


imported:  apache#35929
  • Loading branch information
wangbo committed Jul 11, 2024
1 parent 0a51fea commit 057d1f4
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 16 deletions.
12 changes: 9 additions & 3 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,16 +660,22 @@ public void exec() throws Exception {

@Override
public void close() {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
// NOTE: all close method should be no exception
if (queryQueue != null && queueToken != null) {
try {
queryQueue.releaseAndNotify(queueToken);
} catch (Throwable t) {
LOG.error("error happens when coordinator close ", t);
}
}

try {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
} catch (Throwable t) {
LOG.error("error happens when scannode stop ", t);
}
}

private void execInternal() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,9 +129,11 @@ public QueueToken getToken() throws UserException {
public void releaseAndNotify(QueueToken releaseToken) {
queueLock.lock();
try {
//NOTE:token's tokenState need to be locked by queueLock
// NOTE:token's tokenState need to be locked by queueLock
if (releaseToken.isReadyToRun()) {
currentRunningQueryNum--;
} else {
priorityTokenQueue.remove(releaseToken);
}
Preconditions.checkArgument(currentRunningQueryNum >= 0);
while (currentRunningQueryNum < maxConcurrency) {
Expand Down Expand Up @@ -165,13 +167,4 @@ public void resetQueueProperty(int maxConcurrency, int maxQueueSize, int queryWa
}
}

public void removeToken(QueueToken queueToken) {
queueLock.lock();
try {
priorityTokenQueue.remove(queueToken);
} finally {
queueLock.unlock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,10 @@ public void get(String queryId, int queryTimeout) throws UserException {
try {
future.get(waitTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
queue.removeToken(this);
throw new UserException("query queue timeout, timeout: " + waitTimeout + " ms ");
} catch (CancellationException e) {
queue.removeToken(this);
throw new UserException("query is cancelled");
} catch (Throwable t) {
queue.removeToken(this);
String errMsg = String.format("error happens when query {} queue", queryId);
LOG.error(errMsg, t);
throw new RuntimeException(errMsg, t);
Expand Down

0 comments on commit 057d1f4

Please sign in to comment.