Skip to content

Commit

Permalink
Fix when no partition need to analyze write all 0 to column stats bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li committed Jul 25, 2024
1 parent d610a2d commit 6339662
Show file tree
Hide file tree
Showing 8 changed files with 73 additions and 39 deletions.
31 changes: 8 additions & 23 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1204,33 +1204,18 @@ public boolean needReAnalyzeTable(TableStatsMeta tblStats) {

@Override
public Map<String, Set<String>> findReAnalyzeNeededPartitions() {
TableStatsMeta tableStats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(getId());
Set<String> allPartitions = getPartitionNames().stream().map(this::getPartition)
.filter(Partition::hasData).map(Partition::getName).collect(Collectors.toSet());
if (tableStats == null) {
Map<String, Set<String>> ret = Maps.newHashMap();
for (Column col : getSchemaAllIndexes(false)) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
ret.put(col.getName(), allPartitions);
}
return ret;
}
Map<String, Set<String>> colToPart = new HashMap<>();
Set<String> partitions = Sets.newHashSet();
// No need to filter unchanged partitions, because it may bring unexpected behavior.
// Use dummy partition to skip it.
partitions.add("Dummy Partition");
Map<String, Set<String>> colToParts = new HashMap<>();
for (Column col : getSchemaAllIndexes(false)) {
if (StatisticsUtil.isUnsupportedType(col.getType())) {
continue;
}
long lastUpdateTime = tableStats.findColumnLastUpdateTime(col.getName());
Set<String> partitions = getPartitionNames().stream()
.map(this::getPartition)
.filter(Partition::hasData)
.filter(partition -> partition.getVisibleVersionTime() >= lastUpdateTime).map(Partition::getName)
.collect(Collectors.toSet());
colToPart.put(col.getName(), partitions);
}
return colToPart;
colToParts.put(col.getName(), partitions);
}
return colToParts;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,9 @@ public enum ScheduleType {

@SerializedName("emptyJob")
public final boolean emptyJob;

@SerializedName("rowCount")
public final long rowCount;
/**
*
* Used to store the newest partition version of tbl when creating this job.
Expand All @@ -206,7 +209,8 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
long lastExecTimeInMs, long timeCostInMs, AnalysisState state, ScheduleType scheduleType,
boolean isExternalTableLevelTask, boolean partitionOnly, boolean samplingPartition,
boolean isAllPartition, long partitionCount, CronExpression cronExpression, boolean forceFull,
boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject) {
boolean usingSqlForPartitionColumn, long tblUpdateTime, boolean emptyJob, boolean userInject,
long rowCount) {
this.jobId = jobId;
this.taskId = taskId;
this.taskIds = taskIds;
Expand Down Expand Up @@ -244,6 +248,7 @@ public AnalysisInfo(long jobId, long taskId, List<Long> taskIds, long catalogId,
this.tblUpdateTime = tblUpdateTime;
this.emptyJob = emptyJob;
this.userInject = userInject;
this.rowCount = rowCount;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class AnalysisInfoBuilder {
private long tblUpdateTime;
private boolean emptyJob;
private boolean userInject;
private long rowCount;

public AnalysisInfoBuilder() {
}
Expand Down Expand Up @@ -103,6 +104,7 @@ public AnalysisInfoBuilder(AnalysisInfo info) {
tblUpdateTime = info.tblUpdateTime;
emptyJob = info.emptyJob;
userInject = info.userInject;
rowCount = info.rowCount;
}

public AnalysisInfoBuilder setJobId(long jobId) {
Expand Down Expand Up @@ -275,12 +277,17 @@ public AnalysisInfoBuilder setUserInject(boolean userInject) {
return this;
}

public AnalysisInfoBuilder setRowCount(long rowCount) {
this.rowCount = rowCount;
return this;
}

public AnalysisInfo build() {
return new AnalysisInfo(jobId, taskId, taskIds, catalogId, dbId, tblId, colToPartitions, partitionNames,
colName, indexId, jobType, analysisMode, analysisMethod, analysisType, samplePercent,
sampleRows, maxBucketNum, periodTimeInMs, message, lastExecTimeInMs, timeCostInMs, state, scheduleType,
externalTableLevelTask, partitionOnly, samplingPartition, isAllPartition, partitionCount,
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject);
cronExpression, forceFull, usingSqlForPartitionColumn, tblUpdateTime, emptyJob, userInject, rowCount);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -309,10 +310,13 @@ private void sendJobId(List<AnalysisInfo> analysisInfos, boolean proxy) {
private Map<String, Set<String>> validateAndGetPartitions(TableIf table, Set<String> columnNames,
Set<String> partitionNames, AnalysisType analysisType) throws DdlException {

Set<String> dummyPartitions = Sets.newHashSet();
// validateAndGetPartitions is to be deprecated, for now, use dummy partition for empty partitions.
dummyPartitions.add("Dummy Partition");
Map<String, Set<String>> columnToPartitions = columnNames.stream()
.collect(Collectors.toMap(
columnName -> columnName,
columnName -> new HashSet<>(partitionNames == null ? Collections.emptySet() : partitionNames)
columnName -> new HashSet<>(partitionNames == null ? dummyPartitions : partitionNames)
));

if (analysisType == AnalysisType.HISTOGRAM) {
Expand Down Expand Up @@ -405,6 +409,8 @@ public AnalysisInfo buildAnalysisJobInfo(AnalyzeTblStmt stmt) throws DdlExceptio
infoBuilder.setTaskIds(Lists.newArrayList());
infoBuilder.setTblUpdateTime(table.getUpdateTime());
infoBuilder.setEmptyJob(table instanceof OlapTable && table.getRowCount() == 0);
long rowCount = StatisticsUtil.isEmptyTable(table, analysisMethod) ? 0 : table.getRowCount();
infoBuilder.setRowCount(rowCount);
return infoBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.MaterializedIndexMeta;
Expand All @@ -37,7 +38,6 @@

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand Down Expand Up @@ -69,15 +69,17 @@ public void doExecute() throws Exception {
return;
}
Set<String> partitionNames = info.colToPartitions.get(info.colName);
if (StatisticsUtil.isEmptyTable(tbl, info.analysisMethod)
|| partitionNames == null || partitionNames.isEmpty()) {
if (partitionNames == null) {
LOG.warn("Table {}.{}.{}, partitionNames for column {} is null. ColToPartitions:[{}]",
info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions);
}
if (partitionNames == null || partitionNames.isEmpty()) {
LOG.warn("Table {}.{}.{}, partitionNames for column {} is null or empty. ColToPartitions:[{}]",
info.catalogId, info.dbId, info.tblId, info.colName, info.colToPartitions);
throw new RuntimeException();
}
if (info.rowCount == 0 && tableSample != null) {
StatsId statsId = new StatsId(concatColumnStatsId(), info.catalogId, info.dbId,
info.tblId, info.indexId, info.colName, null);
job.appendBuf(this, Arrays.asList(new ColStatsData(statsId)));
ColStatsData colStatsData = new ColStatsData(statsId);
Env.getCurrentEnv().getStatisticsCache().syncColStats(colStatsData);
job.appendBuf(this, Collections.singletonList(colStatsData));
return;
}
if (tableSample != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.common.Config;
Expand All @@ -33,6 +32,7 @@
import org.apache.doris.statistics.AnalysisInfo.ScheduleType;
import org.apache.doris.statistics.util.StatisticsUtil;

import com.google.common.collect.Sets;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -228,10 +228,12 @@ protected AnalysisInfo getReAnalyzeRequiredPart(AnalysisInfo jobInfo) {
Set<String> partitionColumnNames = olapTable.getPartitionInfo().getPartitionColumns().stream()
.map(Column::getName).collect(Collectors.toSet());
colNames = partitionColumnNames.stream().collect(Collectors.joining(","));
Set<String> partitionNames = olapTable.getAllPartitions().stream()
.map(Partition::getName).collect(Collectors.toSet());
Set<String> partitions = Sets.newHashSet();
// No need to filter unchanged partitions, because it may bring unexpected behavior.
// Use dummy partition to skip it.
partitions.add("Dummy Partition");
for (String column : partitionColumnNames) {
needRunPartitions.put(column, partitionNames);
needRunPartitions.put(column, partitions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public void update(AnalysisInfo analyzedJob, TableIf tableIf) {
jobType = analyzedJob.jobType;
if (tableIf != null) {
if (tableIf instanceof OlapTable) {
rowCount = analyzedJob.emptyJob ? 0 : tableIf.getRowCount();
rowCount = analyzedJob.rowCount;
}
if (!analyzedJob.emptyJob && analyzedJob.colToPartitions.keySet()
.containsAll(tableIf.getBaseSchema().stream()
Expand Down
27 changes: 27 additions & 0 deletions regression-test/suites/statistics/analyze_stats.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -2825,6 +2825,33 @@ PARTITION `p599` VALUES IN (599)
assertEquals("521779.0", alter_result[0][5])
assertEquals("7.142863009760572", alter_result[0][6])

// Test analyze after new empty partition created.
sql """CREATE TABLE `part` (
`id` INT NULL,
`colint` INT NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
PARTITION BY RANGE(`id`)
(PARTITION p1 VALUES [("-2147483648"), ("10000")),
PARTITION p2 VALUES [("10000"), ("20000")))
DISTRIBUTED BY HASH(`id`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""

sql """analyze table part with sync;"""
sql """Insert into part values (1, 1), (10001, 10001);"""
sql """analyze table part with sync;"""
sleep(1000)
sql """alter table part add partition p3 VALUES [("20000"), ("30000"));"""
sql """analyze table part with sync;"""
sql """analyze table part with sync;"""
def new_part_result = sql """show column stats part(id)"""
assertEquals("2.0", new_part_result[0][2])
new_part_result = sql """show column stats part(colint)"""
assertEquals("2.0", new_part_result[0][2])

sql """DROP DATABASE IF EXISTS trigger"""
}
Expand Down

0 comments on commit 6339662

Please sign in to comment.