Skip to content

Commit

Permalink
unified cancel method in coordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
wangbo committed Sep 20, 2024
1 parent ed2ec7b commit e2ee9c1
Show file tree
Hide file tree
Showing 15 changed files with 35 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
Expand All @@ -45,6 +46,7 @@
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Strings;
Expand Down Expand Up @@ -316,7 +318,7 @@ protected void unprotectedExecuteRetry(FailMsg failMsg) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, "load job failed"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ public Object killQuery(HttpServletRequest request, HttpServletResponse response
}

ExecuteEnv env = ExecuteEnv.getInstance();
env.getScheduler().cancelQuery(queryId);
env.getScheduler().cancelQuery(queryId, "cancel query by rest api");
return ResponseEntityBuilder.ok();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.task.AbstractTask;
Expand All @@ -33,6 +34,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -219,7 +221,7 @@ protected void executeCancelLogic() {
}
isCanceled.getAndSet(true);
if (null != stmtExecutor) {
stmtExecutor.cancel();
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "insert task cancelled"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.TimeUtils;
Expand All @@ -50,6 +51,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -258,7 +260,7 @@ public synchronized void onSuccess() throws JobException {
protected synchronized void executeCancelLogic() {
LOG.info("mtmv task cancel, taskId: {}", super.getTaskId());
if (executor != null) {
executor.cancel();
executor.cancel(new Status(TStatusCode.CANCELLED, "mtmv task cancelled"));
}
after();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Status;
import org.apache.doris.load.ExportFailMsg.CancelType;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
Expand All @@ -35,6 +36,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -156,7 +158,7 @@ public void cancel() throws JobException {
}
isCanceled.getAndSet(true);
if (stmtExecutor != null) {
stmtExecutor.cancel();
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "export task cancelled"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.QuotaExceedException;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
Expand All @@ -57,6 +58,7 @@
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.thrift.TEtlState;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.AbstractTxnStateChangeCallback;
import org.apache.doris.transaction.BeginTransactionException;
Expand Down Expand Up @@ -607,7 +609,7 @@ protected void unprotectedExecuteRetry(FailMsg failMsg) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, failMsg.getMsg()));
}
}

Expand Down Expand Up @@ -671,7 +673,7 @@ protected void unprotectedExecuteCancel(FailMsg failMsg, boolean abortTxn) {
for (TUniqueId loadId : loadIds) {
Coordinator coordinator = QeProcessorImpl.INSTANCE.getCoordinator(loadId);
if (coordinator != null) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, failMsg.getMsg()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.nereids.NereidsPlanner;
Expand All @@ -37,6 +38,7 @@
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.task.LoadEtlTask;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TStatusCode;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -141,7 +143,7 @@ protected final void execImpl(StmtExecutor executor, long jobId) throws Exceptio
}
boolean notTimeout = coordinator.join(execTimeout);
if (!coordinator.isDone()) {
coordinator.cancel();
coordinator.cancel(new Status(TStatusCode.CANCELLED, "insert timeout"));
if (notTimeout) {
errMsg = coordinator.getExecStatus().getErrorMsg();
ErrorReport.reportDdlException("there exists unhealthy backend. "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void kill(boolean killConnection) {
closeChannel();
}
// Now, cancel running query.
cancelQuery();
cancelQuery("cancel query by user");
}

// kill operation with no protect by timeout.
Expand All @@ -960,10 +960,10 @@ private void killByTimeout(boolean killConnection) {
}
}

public void cancelQuery() {
public void cancelQuery(String cancelReason) {
StmtExecutor executorRef = executor;
if (executorRef != null) {
executorRef.cancel();
executorRef.cancel(new Status(TStatusCode.CANCELLED, cancelReason));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,11 +145,11 @@ public ConnectContext getContext(String flightToken) {
return null;
}

public void cancelQuery(String queryId) {
public void cancelQuery(String queryId, String cancelReason) {
for (ConnectContext ctx : connectionMap.values()) {
TUniqueId qid = ctx.queryId();
if (qid != null && DebugUtil.printId(qid).equals(queryId)) {
ctx.cancelQuery();
ctx.cancelQuery(cancelReason);
break;
}
}
Expand Down
11 changes: 2 additions & 9 deletions fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -1280,18 +1280,11 @@ public Status shouldCancel(List<Backend> currentBackends) {
}
}

// Cancel execution of query. This includes the execution of the local plan
// fragment,
// if any, as well as all plan fragments on remote nodes.
public void cancel() {
cancel(new Status(TStatusCode.CANCELLED, "query is cancelled by user"));
@Override
public void cancel(Status cancelReason) {
if (queueToken != null) {
queueToken.cancel();
}
}

@Override
public void cancel(Status cancelReason) {
for (ScanNode scanNode : scanNodes) {
scanNode.stop();
}
Expand Down
18 changes: 2 additions & 16 deletions fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1528,7 +1528,7 @@ private void resetAnalyzerAndStmt() {
}

// Because this is called by other thread
public void cancel() {
public void cancel(Status cancelReason) {
if (masterOpExecutor != null) {
try {
masterOpExecutor.cancel();
Expand All @@ -1544,7 +1544,7 @@ public void cancel() {
}
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel();
coordRef.cancel(cancelReason);
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
Expand All @@ -1570,20 +1570,6 @@ private Optional<InsertOverwriteTableCommand> getInsertOverwriteTableCommand() {
return Optional.empty();
}

// Because this is called by other thread
public void cancel(Status cancelReason) {
Coordinator coordRef = coord;
if (coordRef != null) {
coordRef.cancel(cancelReason);
}
if (mysqlLoadId != null) {
Env.getCurrentEnv().getLoadManager().getMysqlLoadManager().cancelMySqlLoad(mysqlLoadId);
}
if (parsedStmt instanceof AnalyzeTblStmt || parsedStmt instanceof AnalyzeDBStmt) {
Env.getCurrentEnv().getAnalysisManager().cancelSyncTask(context);
}
}

// Handle kill statement.
private void handleKill() throws UserException {
KillStmt killStmt = (KillStmt) parsedStmt;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void exec(WorkloadQueryInfo queryInfo) {
&& queryInfo.tUniqueId != null
&& QeProcessorImpl.INSTANCE.getCoordinator(queryInfo.tUniqueId) != null) {
LOG.info("cancel query {} triggered by query schedule policy.", queryInfo.queryId);
queryInfo.context.cancelQuery();
queryInfo.context.cancelQuery("cancel query by workload policy");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException {
TUniqueId queryId = params.getQueryId();
ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
if (ctx != null) {
ctx.cancelQuery();
ctx.cancelQuery("cancel query by forward request.");
}
final TMasterOpResult result = new TMasterOpResult();
result.setStatusCode(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void kill(boolean killConnection) {
connectScheduler.unregisterConnection(this);
}
// Now, cancel running query.
cancelQuery();
cancelQuery("arrow flight query killed by user");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.AuditLogHelper;
Expand All @@ -36,6 +37,7 @@
import org.apache.doris.statistics.AnalysisInfo.AnalysisType;
import org.apache.doris.statistics.util.DBObjects;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
Expand Down Expand Up @@ -267,7 +269,7 @@ protected void setTaskStateToRunning() {
public void cancel() {
killed = true;
if (stmtExecutor != null) {
stmtExecutor.cancel();
stmtExecutor.cancel(new Status(TStatusCode.CANCELLED, "analysis task cancelled"));
}
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.FAILED,
Expand Down

0 comments on commit e2ee9c1

Please sign in to comment.