diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java index 455f60ddd37bdc..bc973ce818b9f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableAttributes.java @@ -33,9 +33,9 @@ public class TableAttributes { @SerializedName(value = "constraints") private final Map constraintsMap = new HashMap<>(); @SerializedName(value = "visibleVersion") - private long visibleVersion; + private volatile long visibleVersion; @SerializedName(value = "visibleVersionTime") - private long visibleVersionTime; + private volatile long visibleVersionTime; public TableAttributes() { this.visibleVersion = TABLE_INIT_VERSION; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index ccae5a6abb5151..db07ca7fa5ed2a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -115,6 +115,7 @@ import org.apache.doris.mtmv.BaseTableInfo; import org.apache.doris.mtmv.MTMVUtil; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.nereids.stats.SimpleAggCacheMgr; import org.apache.doris.nereids.trees.plans.commands.CreateStreamCommand; import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType; import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionLikeOp; @@ -2011,6 +2012,9 @@ public void dropPartitionWithoutCheck(Database db, OlapTable olapTable, String p olapTable.updateVisibleVersionAndTime(version, versionTime); } } + if (partition != null && !isTempPartition) { + SimpleAggCacheMgr.internalInstance().invalidateTable(olapTable.getId()); + } // Here, we only wait for the EventProcessor to finish processing the event, // but regardless of the success or failure of the result, @@ -2042,15 +2046,18 @@ public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundExcep OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP); olapTable.writeLock(); try { + Partition partition = null; if (info.isTempPartition()) { olapTable.dropTempPartition(info.getPartitionName(), true); } else { - Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(), - info.isForceDrop()); + partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop()); if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) { Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime()); } } + if (partition != null && !info.isTempPartition()) { + SimpleAggCacheMgr.internalInstance().invalidateTable(olapTable.getId()); + } olapTable.updateVisibleVersionAndTime(info.getVersion(), info.getVersionTime()); // Replay set new partition loaded flag to true for auto analyze. TableStatsMeta stats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java index cc9cf81d27523e..1b67a7cd35ddb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java @@ -317,6 +317,18 @@ public void removeStats(ColumnMinMaxKey key) { cache.synchronous().invalidate(key); } + /** + * Evict all cached simple aggregate values for a table. + */ + public void invalidateTable(long tableId) { + if (rowCountCache != null) { + rowCountCache.synchronous().invalidate(tableId); + } + if (cache != null) { + cache.synchronous().asMap().keySet().removeIf(key -> key.getTableId() == tableId); + } + } + /** * Get the cached row count for a table. * diff --git a/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy b/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy new file mode 100644 index 00000000000000..5519f0b51626b1 --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/rewrite_simple_agg_to_constant/auto_partition_recycle_count_cache.groovy @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("auto_partition_recycle_count_cache", "nonConcurrent") { + def oldCheckInterval = getFeConfig("dynamic_partition_check_interval_seconds") + + try { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "DROP TABLE IF EXISTS auto_partition_recycle_count_cache FORCE" + sql """ + CREATE TABLE auto_partition_recycle_count_cache ( + k0 DATETIME(6) NOT NULL + ) ENGINE=OLAP + DUPLICATE KEY(k0) + AUTO PARTITION BY RANGE(date_trunc(k0, 'day')) () + DISTRIBUTED BY HASH(k0) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "partition.retention_count" = "3" + ) + """ + + sql """ + INSERT INTO auto_partition_recycle_count_cache VALUES + ('2020-01-01 00:00:00'), + ('2020-01-02 00:00:00'), + ('2020-01-03 00:00:00'), + ('2020-01-04 00:00:00'), + ('2020-01-05 00:00:00'), + ('2020-01-06 00:00:00') + """ + sql "sync" + + def partitionCount = sql "SHOW PARTITIONS FROM auto_partition_recycle_count_cache" + assertEquals(6, partitionCount.size()) + + sql "SELECT count(*) FROM auto_partition_recycle_count_cache" + def cacheReady = false + for (int i = 0; i < 30; i++) { + def explainResult = sql "EXPLAIN SELECT count(*) FROM auto_partition_recycle_count_cache" + if (explainResult.toString().contains("constant exprs")) { + cacheReady = true + break + } + sleep(1000) + } + if (!cacheReady) { + if (isCloudMode()) { + logger.info("SimpleAggCacheMgr did not warm up in cloud mode, skip") + return + } + assertTrue(false, "SimpleAggCacheMgr cache did not warm up within 30 seconds") + } + + def countBeforeRecycle = sql "SELECT count(*) FROM auto_partition_recycle_count_cache" + assertEquals(6L, countBeforeRecycle[0][0] as long) + + setFeConfig("dynamic_partition_check_interval_seconds", 1) + def recycled = false + for (int i = 0; i < 30; i++) { + partitionCount = sql "SHOW PARTITIONS FROM auto_partition_recycle_count_cache" + if (partitionCount.size() == 3) { + recycled = true + break + } + sleep(1000) + } + assertTrue(recycled, "auto partition retention did not recycle partitions within 30 seconds") + + def countAfterRecycle = sql "SELECT count(*) FROM auto_partition_recycle_count_cache" + assertEquals(3L, countAfterRecycle[0][0] as long) + } finally { + setFeConfig("dynamic_partition_check_interval_seconds", oldCheckInterval) + sql "DROP TABLE IF EXISTS auto_partition_recycle_count_cache FORCE" + } +}