Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Release workgroup token immediately when fragment is cancelled (backport #19310) #19363

Merged
merged 3 commits into from Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions be/src/exec/pipeline/fragment_context.cpp
Expand Up @@ -8,6 +8,15 @@

namespace starrocks::pipeline {

void FragmentContext::cancel(const Status& status) {
if (_runtime_state != nullptr && _runtime_state->query_ctx() != nullptr) {
_runtime_state->query_ctx()->release_workgroup_token_once();
}

_runtime_state->set_is_cancelled(true);
set_final_status(status);
}

void FragmentContext::set_final_status(const Status& status) {
if (_final_status.load() != nullptr) {
return;
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/pipeline/fragment_context.h
Expand Up @@ -71,10 +71,7 @@ class FragmentContext {
return status == nullptr ? Status::OK() : *status;
}

void cancel(const Status& status) {
_runtime_state->set_is_cancelled(true);
set_final_status(status);
}
void cancel(const Status& status);

void finish() { cancel(Status::OK()); }

Expand Down
8 changes: 8 additions & 0 deletions be/src/exec/pipeline/query_context.cpp
Expand Up @@ -97,6 +97,7 @@ Status QueryContext::init_query_once(workgroup::WorkGroup* wg) {
auto maybe_token = wg->acquire_running_query_token();
if (maybe_token.ok()) {
_wg_running_query_token_ptr = std::move(maybe_token.value());
_wg_running_query_token_atomic_ptr = _wg_running_query_token_ptr.get();
} else {
st = maybe_token.status();
}
Expand All @@ -106,6 +107,13 @@ Status QueryContext::init_query_once(workgroup::WorkGroup* wg) {
return st;
}

void QueryContext::release_workgroup_token_once() {
auto* old = _wg_running_query_token_atomic_ptr.load();
if (old != nullptr && _wg_running_query_token_atomic_ptr.compare_exchange_strong(old, nullptr)) {
_wg_running_query_token_ptr.reset();
}
}

std::shared_ptr<QueryStatisticsRecvr> QueryContext::maintained_query_recv() {
return _sub_plan_query_statistics_recvr;
}
Expand Down
5 changes: 5 additions & 0 deletions be/src/exec/pipeline/query_context.h
Expand Up @@ -100,6 +100,10 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }

Status init_query_once(workgroup::WorkGroup* wg);
/// Release the workgroup token only once to avoid double-free.
/// This method should only be invoked while the QueryContext is still valid,
/// to avoid double-free between the destruction and this method.
void release_workgroup_token_once();

// Some statistic about the query, including cpu, scan_rows, scan_bytes
int64_t mem_cost_bytes() const { return _mem_tracker->peak_consumption(); }
Expand Down Expand Up @@ -173,6 +177,7 @@ class QueryContext : public std::enable_shared_from_this<QueryContext> {

int64_t _scan_limit = 0;
workgroup::RunningQueryTokenPtr _wg_running_query_token_ptr;
std::atomic<workgroup::RunningQueryToken*> _wg_running_query_token_atomic_ptr = nullptr;
};

class QueryContextManager {
Expand Down
44 changes: 35 additions & 9 deletions fe/fe-core/src/main/java/com/starrocks/qe/Coordinator.java
Expand Up @@ -499,6 +499,22 @@ public static TWorkGroup prepareWorkGroup(ConnectContext connect) {
return workgroup;
}

private void handleErrorBackendExecState(BackendExecState errorBackendExecState, TStatusCode errorCode, String errMessage)
throws UserException, RpcException {
if (errorBackendExecState != null) {
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (Objects.requireNonNull(errorCode)) {
case TIMEOUT:
throw new UserException("query timeout. backend id: " + errorBackendExecState.backend.getId());
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(errorBackendExecState.backend.getId());
throw new RpcException(errorBackendExecState.backend.getHost(), "rpc failed");
default:
throw new UserException(errMessage + " backend:" + errorBackendExecState.address.hostname);
}
}
}

private void prepareResultSink() throws Exception {
PlanFragmentId topId = fragments.get(0).getFragmentId();
FragmentExecParams topParams = fragmentExecParamsMap.get(topId);
Expand Down Expand Up @@ -547,6 +563,7 @@ private void prepareResultSink() throws Exception {
for (TUniqueId instanceId : instanceIds) {
profileDoneSignal.addMark(instanceId, -1L /* value is meaningless */);
}

long queryDeliveryTimeoutMs = Math.min(queryOptions.query_timeout, queryOptions.query_delivery_timeout) * 1000L;
lock();
try {
Expand Down Expand Up @@ -635,6 +652,10 @@ private void prepareResultSink() throws Exception {
}
futures.add(Pair.create(execState, execState.execRemoteFragmentAsync()));
}

BackendExecState errorBackendExecState = null;
TStatusCode errorCode = null;
String errMessage = null;
for (Pair<BackendExecState, Future<PExecPlanFragmentResult>> pair : futures) {
TStatusCode code;
String errMsg = null;
Expand Down Expand Up @@ -664,18 +685,23 @@ private void prepareResultSink() throws Exception {
LOG.warn("exec plan fragment failed, errmsg={}, code: {}, fragmentId={}, backend={}:{}",
errMsg, code, fragment.getFragmentId(),
pair.first.address.hostname, pair.first.address.port);
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (Objects.requireNonNull(code)) {
case TIMEOUT:
throw new UserException("query timeout. backend id: " + pair.first.backend.getId());
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.backend.getId());
throw new RpcException(pair.first.backend.getHost(), "rpc failed");
default:
throw new UserException(errMsg);

if (errorBackendExecState == null) {
errorBackendExecState = pair.first;
errorCode = code;
errMessage = errMsg;
}
if (Objects.requireNonNull(code) == TStatusCode.TIMEOUT) {
break;
}
}
}

// Handle error results and cancel fragment instances, excluding TIMEOUT errors,
// until all the delivered fragment instances are completed.
// Otherwise, the cancellation RPC may arrive at BE before the delivery fragment instance RPC,
// causing the instances to become stale and only able to be released after a timeout.
handleErrorBackendExecState(errorBackendExecState, errorCode, errMessage);
}
profileFragmentId += 1;
}
Expand Down