Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improvement](nereids) Refactor analysis task #22658

Merged
merged 1 commit into from
Aug 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ public void execute(ThreadPoolExecutor executor) {
return;
}
try {
task.doExecute();
task.execute();
updateSyncTaskStatus(task, AnalysisState.FINISHED);
} catch (Throwable t) {
colNames.add(task.info.colName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.qe.StmtExecutor;
Expand All @@ -33,8 +32,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;

public abstract class BaseAnalysisTask {
Expand Down Expand Up @@ -102,8 +99,6 @@ public abstract class BaseAnalysisTask {

protected StmtExecutor stmtExecutor;

protected Set<PrimitiveType> unsupportedType = new HashSet<>();

protected volatile boolean killed;

@VisibleForTesting
Expand All @@ -116,17 +111,7 @@ public BaseAnalysisTask(AnalysisInfo info) {
init(info);
}

protected void initUnsupportedType() {
unsupportedType.add(PrimitiveType.HLL);
unsupportedType.add(PrimitiveType.BITMAP);
unsupportedType.add(PrimitiveType.ARRAY);
unsupportedType.add(PrimitiveType.MAP);
unsupportedType.add(PrimitiveType.JSONB);
unsupportedType.add(PrimitiveType.STRUCT);
}

private void init(AnalysisInfo info) {
initUnsupportedType();
catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(info.catalogName);
if (catalog == null) {
Env.getCurrentEnv().getAnalysisManager().updateTaskStatus(info, AnalysisState.FAILED,
Expand Down Expand Up @@ -162,6 +147,16 @@ private void init(AnalysisInfo info) {
}

public void execute() {
prepareExecution();
executeWithRetry();
afterExecution();
}

protected void prepareExecution() {
setTaskStateToRunning();
}

protected void executeWithRetry() {
int retriedTimes = 0;
while (retriedTimes <= StatisticConstants.ANALYZE_TASK_RETRY_TIMES) {
if (killed) {
Expand All @@ -182,6 +177,10 @@ public void execute() {

public abstract void doExecute() throws Exception;

protected void afterExecution() {
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}

protected void setTaskStateToRunning() {
Env.getCurrentEnv().getAnalysisManager()
.updateTaskStatus(info, AnalysisState.RUNNING, "", System.currentTimeMillis());
Expand All @@ -197,10 +196,6 @@ public void cancel() {
String.format("Job has been cancelled: %s", info.message), System.currentTimeMillis());
}

public long getLastExecTime() {
return info.lastExecTimeInMs;
}

public long getJobId() {
return info.jobId;
}
Expand All @@ -213,10 +208,6 @@ protected String getDataSizeFunction(Column column) {
return "COUNT(1) * " + column.getType().getSlotSize();
}

private boolean isUnsupportedType(PrimitiveType type) {
return unsupportedType.contains(type);
}

protected String getSampleExpression() {
if (info.analysisMethod == AnalysisMethod.FULL) {
return "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.util.TimeUtils;
Expand Down Expand Up @@ -106,7 +105,6 @@ public HMSAnalysisTask(AnalysisInfo info) {
}

public void doExecute() throws Exception {
setTaskStateToRunning();
if (isTableLevelTask) {
getTableStats();
} else {
Expand Down Expand Up @@ -232,8 +230,6 @@ private void getTableColumnStats() throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(sb.toString());
executeInsertSql(sql);
Env.getCurrentEnv().getStatisticsCache().refreshColStatsSync(
catalog.getId(), db.getId(), tbl.getId(), -1, col.getName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public HistogramTask(AnalysisInfo info) {

@Override
public void doExecute() throws Exception {
setTaskStateToRunning();
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("histogramStatTbl", StatisticConstants.HISTOGRAM_TBL_NAME);
Expand All @@ -80,6 +79,11 @@ public void doExecute() throws Exception {
Env.getCurrentEnv().getStatisticsCache().refreshHistogramSync(tbl.getId(), -1, col.getName());
}

@Override
protected void afterExecution() {
// DO NOTHING
}

private String getSampleRateFunction() {
if (info.analysisMethod == AnalysisMethod.FULL) {
return "0";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ private void init() {

@Override
public void doExecute() throws Exception {
setTaskStateToRunning();
for (Column column : meta.getSchema()) {
SelectStmt selectOne = (SelectStmt) selectStmt.clone();
TableRef tableRef = selectOne.getTableRefs().get(0);
Expand Down Expand Up @@ -146,4 +145,9 @@ private boolean isCorrespondingToColumn(SelectListItem item, Column column) {
}
return false;
}

@Override
protected void afterExecution() {
// DO NOTHING
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.statistics;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.FeConstants;
import org.apache.doris.qe.AutoCloseConnectContext;
Expand Down Expand Up @@ -55,7 +54,6 @@ public OlapAnalysisTask(AnalysisInfo info) {
}

public void doExecute() throws Exception {
setTaskStateToRunning();
Map<String, String> params = new HashMap<>();
params.put("internalDB", FeConstants.INTERNAL_DB_NAME);
params.put("columnStatTbl", StatisticConstants.STATISTIC_TBL_NAME);
Expand Down Expand Up @@ -93,7 +91,6 @@ public void doExecute() throws Exception {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(ANALYZE_COLUMN_SQL_TEMPLATE);
execSQL(sql);
Env.getCurrentEnv().getStatisticsCache().syncLoadColStats(tbl.getId(), -1, col.getName());
}

@VisibleForTesting
Expand Down
Loading