From 0e8f934a3ef87055fe0106a0a9046d40a38174f2 Mon Sep 17 00:00:00 2001 From: englefly Date: Tue, 12 May 2026 16:04:07 +0800 Subject: [PATCH] [fix](fe) Fix simple aggregate cache after partition recycle ### What problem does this PR solve? Issue Number: None Related PR: None Problem Summary: Auto partition recycle updates table visible version from the dynamic partition thread, while simple aggregate cache validation reads the table visible version without holding the table lock. Make table visible version fields visible across threads and defensively invalidate simple aggregate cache on partition drop/replay to avoid stale count(*) constants after partition recycle. ### Release note None ### Check List (For Author) - Test: Regression test / Build - FE checkstyle: cd fe && mvn checkstyle:check -pl fe-core -DskipTests - FE build: ./build.sh --fe - Regression test: ./run-regression-test.sh --run -d nereids_rules_p0/rewrite_simple_agg_to_constant -s auto_partition_recycle_count_cache - Manual test: DORIS_QUERY_PORT=39031 DORIS_HTTP_PORT=38031 bash test.sh - Behavior changed: No - Does this need documentation: No Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../apache/doris/catalog/TableAttributes.java | 4 +- .../doris/datasource/InternalCatalog.java | 11 ++- .../nereids/stats/SimpleAggCacheMgr.java | 12 +++ .../auto_partition_recycle_count_cache.groovy | 91 +++++++++++++++++++ 4 files changed, 114 insertions(+), 4 deletions(-) create mode 100644 regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java index 455f60ddd37bdc..bc973ce818b9f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java @@ -33,9 +33,9 @@ public class TableAttributes { @SerializedName(value = "constraints") private final Map constraintsMap = new HashMap<>(); @SerializedName(value = "visibleVersion") - private long visibleVersion; + private volatile long visibleVersion; @SerializedName(value = "visibleVersionTime") - private long visibleVersionTime; + private volatile long visibleVersionTime; public TableAttributes() { this.visibleVersion = TABLE_INIT_VERSION; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index ccae5a6abb5151..db07ca7fa5ed2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -115,6 +115,7 @@ import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; import org.apache.doris.nereids.trees.plans.commands.CreateStreamCommand; import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType; import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionLikeOp; @@ -2011,6 +2012,9 @@ public void dropPartitionWithoutCheck(Database db, OlapTable olapTable, String p olapTable.updateVisibleVersionAndTime(version, versionTime); } } + if (partition != null && !isTempPartition) { + SimpleAggCacheMgr.internalInstance().invalidateTable(olapTable.getId()); + } // Here, we only wait for the EventProcessor to finish processing the event, // but regardless of the success or failure of the result, @@ -2042,15 +2046,18 @@ public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundExcep OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP); olapTable.writeLock(); try { + Partition partition = null; if (info.isTempPartition()) { olapTable.dropTempPartition(info.getPartitionName(), true); } else { - Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(), - info.isForceDrop()); + partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop()); if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) { Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime()); } } + if (partition != null && !info.isTempPartition()) { + SimpleAggCacheMgr.internalInstance().invalidateTable(olapTable.getId()); + } olapTable.updateVisibleVersionAndTime(info.getVersion(), info.getVersionTime()); // Replay set new partition loaded flag to true for auto analyze. TableStatsMeta stats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java index cc9cf81d27523e..1b67a7cd35ddb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java @@ -317,6 +317,18 @@ public void removeStats(ColumnMinMaxKey key) { cache.synchronous().invalidate(key); } + /** + * Evict all cached simple aggregate values for a table. + */ + public void invalidateTable(long tableId) { + if (rowCountCache != null) { + rowCountCache.synchronous().invalidate(tableId); + } + if (cache != null) { + cache.synchronous().asMap().keySet().removeIf(key -> key.getTableId() == tableId); + } + } + /** * Get the cached row count for a table. * diff --git a/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy b/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy new file mode 100644 index 00000000000000..5519f0b51626b1 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("auto_partition_recycle_count_cache", "nonConcurrent") { + def oldCheckInterval = getFeConfig("dynamic_partition_check_interval_seconds") + + try { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "DROP TABLE IF EXISTS auto_partition_recycle_count_cache FORCE" + sql """ + CREATE TABLE auto_partition_recycle_count_cache ( + k0 DATETIME(6) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(k0) + AUTO PARTITION BY RANGE(date_trunc(k0, 'day')) () + DISTRIBUTED BY HASH(k0) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "partition.retention_count" = "3" + ) + """ + + sql """ + INSERT INTO auto_partition_recycle_count_cache VALUES + ('2020-01-01 00:00:00'), + ('2020-01-02 00:00:00'), + ('2020-01-03 00:00:00'), + ('2020-01-04 00:00:00'), + ('2020-01-05 00:00:00'), + ('2020-01-06 00:00:00') + """ + sql "sync" + + def partitionCount = sql "SHOW PARTITIONS FROM auto_partition_recycle_count_cache" + assertEquals(6, partitionCount.size()) + + sql "SELECT count(*) FROM auto_partition_recycle_count_cache" + def cacheReady = false + for (int i = 0; i < 30; i++) { + def explainResult = sql "EXPLAIN SELECT count(*) FROM auto_partition_recycle_count_cache" + if (explainResult.toString().contains("constant exprs")) { + cacheReady = true + break + } + sleep(1000) + } + if (!cacheReady) { + if (isCloudMode()) { + logger.info("SimpleAggCacheMgr did not warm up in cloud mode, skip") + return + } + assertTrue(false, "SimpleAggCacheMgr cache did not warm up within 30 seconds") + } + + def countBeforeRecycle = sql "SELECT count(*) FROM auto_partition_recycle_count_cache" + assertEquals(6L, countBeforeRecycle[0][0] as long) + + setFeConfig("dynamic_partition_check_interval_seconds", 1) + def recycled = false + for (int i = 0; i < 30; i++) { + partitionCount = sql "SHOW PARTITIONS FROM auto_partition_recycle_count_cache" + if (partitionCount.size() == 3) { + recycled = true + break + } + sleep(1000) + } + assertTrue(recycled, "auto partition retention did not recycle partitions within 30 seconds") + + def countAfterRecycle = sql "SELECT count(*) FROM auto_partition_recycle_count_cache" + assertEquals(3L, countAfterRecycle[0][0] as long) + } finally { + setFeConfig("dynamic_partition_check_interval_seconds", oldCheckInterval) + sql "DROP TABLE IF EXISTS auto_partition_recycle_count_cache FORCE" + } +}