Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5350,7 +5350,7 @@ public AnalysisTaskScheduler getAnalysisJobScheduler() {
// 1. handle partition level analysis statement properly
// 2. support sample job
// 3. support period job
public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException {
analysisManager.createAnalysisJob(analyzeStmt);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,11 @@ public CreateTableStmt buildAnalysisJobTblStmt() throws UserException {
columnDefs.add(new ColumnDef("schedule_type", TypeDef.createVarchar(32)));
String engineName = "olap";
KeysDesc keysDesc = new KeysDesc(KeysType.UNIQUE_KEYS,
Lists.newArrayList("job_id"));
Lists.newArrayList("job_id", "task_id"));

DistributionDesc distributionDesc = new HashDistributionDesc(
StatisticConstants.STATISTIC_TABLE_BUCKET_COUNT,
Lists.newArrayList("job_id"));
Lists.newArrayList("job_id", "task_id"));
Map<String, String> properties = new HashMap<String, String>() {
{
put("replication_num", String.valueOf(Config.statistic_internal_table_replica_num));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class AnalysisManager {

private static final String UPDATE_JOB_STATE_SQL_TEMPLATE = "UPDATE "
+ FeConstants.INTERNAL_DB_NAME + "." + StatisticConstants.ANALYSIS_JOB_TABLE + " "
+ "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId}";
+ "SET state = '${jobState}' ${message} ${updateExecTime} WHERE job_id = ${jobId} and task_id=${taskId}";

private static final String SHOW_JOB_STATE_SQL_TEMPLATE = "SELECT "
+ "job_id, catalog_name, db_name, tbl_name, col_name, job_type, "
Expand Down Expand Up @@ -90,7 +90,8 @@ public StatisticsCache getStatisticsCache() {
return statisticsCache;
}

public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
// Each analyze stmt corresponding to an analysis job.
public void createAnalysisJob(AnalyzeStmt analyzeStmt) throws DdlException {
String catalogName = analyzeStmt.getCatalogName();
String db = analyzeStmt.getDBName();
TableName tbl = analyzeStmt.getTblName();
Expand All @@ -99,7 +100,6 @@ public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
Set<String> partitionNames = analyzeStmt.getPartitionNames();
Map<Long, AnalysisTaskInfo> analysisTaskInfos = new HashMap<>();
long jobId = Env.getCurrentEnv().getNextId();

// If the analysis is not incremental, need to delete existing statistics.
// we cannot collect histograms incrementally and do not support it
if (!analyzeStmt.isIncrement && !analyzeStmt.isHistogram) {
Expand All @@ -112,58 +112,87 @@ public void createAnalysisJob(AnalyzeStmt analyzeStmt) {
StatisticsRepository.dropStatistics(dbId, tblIds, colNames, partIds);
}

if (colNames != null) {
for (String colName : colNames) {
createTaskForEachColumns(analyzeStmt, catalogName, db, tbl, colNames, partitionNames, analysisTaskInfos, jobId);
createTaskForMVIdx(analyzeStmt, catalogName, db, tbl, partitionNames, analysisTaskInfos, jobId);
persistAnalysisJob(catalogName, db, tbl, jobId);

if (analyzeStmt.isSync()) {
syncExecute(analysisTaskInfos.values());
return;
}

analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}

private void persistAnalysisJob(String catalogName, String db, TableName tbl,
long jobId) throws DdlException {
try {
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(
jobId).setTaskId(-1)
.setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl())
.setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
.setScheduleType(ScheduleType.ONCE).build();
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Throwable t) {
throw new DdlException(t.getMessage(), t);
}
}

private void createTaskForMVIdx(AnalyzeStmt analyzeStmt, String catalogName, String db, TableName tbl,
Set<String> partitionNames, Map<Long, AnalysisTaskInfo> analysisTaskInfos, long jobId) throws DdlException {
if (!(analyzeStmt.isWholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP))) {
return;
}
OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
try {
olapTable.readLock();
for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) {
if (meta.getDefineStmt() == null) {
continue;
}
long taskId = Env.getCurrentEnv().getNextId();
AnalysisType analType = analyzeStmt.isHistogram ? AnalysisType.HISTOGRAM : AnalysisType.COLUMN;
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId)
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl()).setColName(colName)
.setPartitionNames(partitionNames).setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType)
.setState(AnalysisState.PENDING)
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(
jobId).setTaskId(taskId)
.setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl()).setPartitionNames(partitionNames)
.setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
.setScheduleType(ScheduleType.ONCE).build();
try {
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new RuntimeException("Failed to create analysis job", e);
throw new DdlException("Failed to create analysis task", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
} finally {
olapTable.readUnlock();
}
if (analyzeStmt.isWholeTbl && analyzeStmt.getTable().getType().equals(TableType.OLAP)) {
OlapTable olapTable = (OlapTable) analyzeStmt.getTable();
}

private void createTaskForEachColumns(AnalyzeStmt analyzeStmt, String catalogName, String db, TableName tbl,
Set<String> colNames, Set<String> partitionNames, Map<Long, AnalysisTaskInfo> analysisTaskInfos,
long jobId) throws DdlException {
for (String colName : colNames) {
long taskId = Env.getCurrentEnv().getNextId();
AnalysisType analType = analyzeStmt.isHistogram ? AnalysisType.HISTOGRAM : AnalysisType.COLUMN;
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(jobId)
.setTaskId(taskId).setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl()).setColName(colName)
.setPartitionNames(partitionNames).setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(analType)
.setState(AnalysisState.PENDING)
.setScheduleType(ScheduleType.ONCE).build();
try {
olapTable.readLock();
for (MaterializedIndexMeta meta : olapTable.getIndexIdToMeta().values()) {
if (meta.getDefineStmt() == null) {
continue;
}
long taskId = Env.getCurrentEnv().getNextId();
AnalysisTaskInfo analysisTaskInfo = new AnalysisTaskInfoBuilder().setJobId(
jobId).setTaskId(taskId)
.setCatalogName(catalogName).setDbName(db)
.setTblName(tbl.getTbl()).setPartitionNames(partitionNames)
.setIndexId(meta.getIndexId()).setJobType(JobType.MANUAL)
.setAnalysisMethod(AnalysisMethod.FULL).setAnalysisType(AnalysisType.INDEX)
.setScheduleType(ScheduleType.ONCE).build();
try {
StatisticsRepository.createAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new RuntimeException("Failed to create analysis job", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
} finally {
olapTable.readUnlock();
StatisticsRepository.persistAnalysisTask(analysisTaskInfo);
} catch (Exception e) {
throw new DdlException("Failed to create analysis task", e);
}
analysisTaskInfos.put(taskId, analysisTaskInfo);
}
if (analyzeStmt.isSync()) {
syncExecute(analysisTaskInfos.values());
return;
}
analysisJobIdToTaskMap.put(jobId, analysisTaskInfos);
analysisTaskInfos.values().forEach(taskScheduler::schedule);
}

public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, String message, long time) {
Expand All @@ -172,16 +201,23 @@ public void updateTaskStatus(AnalysisTaskInfo info, AnalysisState jobState, Stri
params.put("message", StringUtils.isNotEmpty(message) ? String.format(", message = '%s'", message) : "");
params.put("updateExecTime", time == -1 ? "" : ", last_exec_time_in_ms=" + time);
params.put("jobId", String.valueOf(info.jobId));
params.put("taskId", String.valueOf(info.taskId));
try {
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e);
LOG.warn(String.format("Failed to update state for task: %d, %d", info.jobId, info.taskId), e);
} finally {
info.state = jobState;
if (analysisJobIdToTaskMap.get(info.jobId).values()
.stream().allMatch(i -> i.state != null
&& i.state != AnalysisState.PENDING && i.state != AnalysisState.RUNNING)) {
analysisJobIdToTaskMap.remove(info.jobId);
params.put("taskId", String.valueOf(-1));
try {
StatisticsUtil.execUpdate(new StringSubstitutor(params).replace(UPDATE_JOB_STATE_SQL_TEMPLATE));
} catch (Exception e) {
LOG.warn(String.format("Failed to update state for job: %s", info.jobId), e);
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,8 @@ public String toString() {
public AnalysisState getState() {
return state;
}

public boolean isJob() {
return taskId == -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,14 @@ private static <T> void buildPredicate(String fieldName, Set<T> fieldValues, Str
predicate.append(partPredicate);
}

public static void createAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception {
public static void persistAnalysisTask(AnalysisTaskInfo analysisTaskInfo) throws Exception {
Map<String, String> params = new HashMap<>();
params.put("jobId", String.valueOf(analysisTaskInfo.jobId));
params.put("taskId", String.valueOf(analysisTaskInfo.taskId));
params.put("catalogName", analysisTaskInfo.catalogName);
params.put("dbName", analysisTaskInfo.dbName);
params.put("tblName", analysisTaskInfo.tblName);
params.put("colName", analysisTaskInfo.colName);
params.put("colName", analysisTaskInfo.colName == null ? "" : analysisTaskInfo.colName);
params.put("indexId", analysisTaskInfo.indexId == null ? "-1" : String.valueOf(analysisTaskInfo.indexId));
params.put("jobType", analysisTaskInfo.jobType.toString());
params.put("analysisType", analysisTaskInfo.analysisMethod.toString());
Expand Down