Skip to content

Commit

Permalink
[improvement](stats) Add lifecycle hooks to AnalysisTask to make code…
Browse files Browse the repository at this point in the history
…s more clear (#22658)
  • Loading branch information
Kikyou1997 authored Aug 8, 2023
1 parent a04e30d commit b5d7e6e
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ public void execute(ThreadPoolExecutor executor) {
return;
}
try {
task.doExecute();
task.execute();
updateSyncTaskStatus(task, AnalysisState.FINISHED);
} catch (Throwable t) {
colNames.add(task.info.colName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.StmtExecutor;
Expand All @@ -33,8 +32,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public abstract class BaseAnalysisTask {
Expand Down Expand Up @@ -102,8 +99,6 @@ public abstract class BaseAnalysisTask {

protected StmtExecutor stmtExecutor;

protected Set<PrimitiveType> unsupportedType = new HashSet<>();

protected volatile boolean killed;

@VisibleForTesting
Expand All @@ -116,17 +111,7 @@ public BaseAnalysisTask(AnalysisInfo info) {
init(info);
}

protected void initUnsupportedType() {
unsupportedType.add(PrimitiveType.HLL);
unsupportedType.add(PrimitiveType.BITMAP);
unsupportedType.add(PrimitiveType.ARRAY);
unsupportedType.add(PrimitiveType.MAP);
unsupportedType.add(PrimitiveType.JSONB);
unsupportedType.add(PrimitiveType.STRUCT);
}

private void init(AnalysisInfo info) {
initUnsupportedType();
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
if (catalog == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
Expand Down Expand Up @@ -162,6 +147,16 @@ private void init(AnalysisInfo info) {
}

public void execute() {
prepareExecution();
executeWithRetry();
afterExecution();
}

protected void prepareExecution() {
setTaskStateToRunning();
}

protected void executeWithRetry() {
int retriedTimes = 0;
while (retriedTimes <= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
if (killed) {
Expand All @@ -182,6 +177,10 @@ public void execute() {

public abstract void doExecute() throws Exception;

protected void afterExecution() {
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}

protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
Expand All @@ -197,10 +196,6 @@ public void cancel() {
String.format("Job has been cancelled: %s", info.message), System.currentTimeMillis());
}

public long getLastExecTime() {
return info.lastExecTimeInMs;
}

public long getJobId() {
return info.jobId;
}
Expand All @@ -213,10 +208,6 @@ protected String getDataSizeFunction(Column column) {
return "COUNT(1) * " + column.getType().getSlotSize();
}

private boolean isUnsupportedType(PrimitiveType type) {
return unsupportedType.contains(type);
}

protected String getSampleExpression() {
if (info.analysisMethod == AnalysisMethod.FULL) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -106,7 +105,6 @@ public HMSAnalysisTask(AnalysisInfo info) {
}

public void doExecute() throws Exception {
setTaskStateToRunning();
if (isTableLevelTask) {
getTableStats();
} else {
Expand Down Expand Up @@ -232,8 +230,6 @@ private void getTableColumnStats() throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(
catalog.getId(), db.getId(), tbl.getId(), -1, col.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public HistogramTask(AnalysisInfo info) {

@Override
public void doExecute() throws Exception {
setTaskStateToRunning();
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("histogramStatTbl", StatisticConstants.HISTOGRAM_TBL_NAME);
Expand All @@ -80,6 +79,11 @@ public void doExecute() throws Exception {
Env.getCurrentEnv().getStatisticsCache().refreshHistogramSync(tbl.getId(), -1, col.getName());
}

@Override
protected void afterExecution() {
// DO NOTHING
}

private String getSampleRateFunction() {
if (info.analysisMethod == AnalysisMethod.FULL) {
return "0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ private void init() {

@Override
public void doExecute() throws Exception {
setTaskStateToRunning();
for (Column column : meta.getSchema()) {
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
TableRef tableRef = selectOne.getTableRefs().get(0);
Expand Down Expand Up @@ -146,4 +145,9 @@ private boolean isCorrespondingToColumn(SelectListItem item, Column column) {
}
return false;
}

@Override
protected void afterExecution() {
// DO NOTHING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.AutoCloseConnectContext;
Expand Down Expand Up @@ -55,7 +54,6 @@ public OlapAnalysisTask(AnalysisInfo info) {
}

public void doExecute() throws Exception {
setTaskStateToRunning();
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
Expand Down Expand Up @@ -93,7 +91,6 @@ public void doExecute() throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}

@VisibleForTesting
Expand Down

0 comments on commit b5d7e6e

Please sign in to comment.