Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ public class TableAttributes {
@SerializedName(value = "constraints")
private final Map<String, Constraint> constraintsMap = new HashMap<>();
@SerializedName(value = "visibleVersion")
private long visibleVersion;
private volatile long visibleVersion;
@SerializedName(value = "visibleVersionTime")
private long visibleVersionTime;
private volatile long visibleVersionTime;

public TableAttributes() {
this.visibleVersion = TABLE_INIT_VERSION;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.doris.mtmv.BaseTableInfo;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.stats.SimpleAggCacheMgr;
import org.apache.doris.nereids.trees.plans.commands.CreateStreamCommand;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionLikeOp;
Expand Down Expand Up @@ -2011,6 +2012,9 @@ public void dropPartitionWithoutCheck(Database db, OlapTable olapTable, String p
olapTable.updateVisibleVersionAndTime(version, versionTime);
}
}
if (partition != null && !isTempPartition) {
SimpleAggCacheMgr.internalInstance().invalidateTable(olapTable.getId());
}

// Here, we only wait for the EventProcessor to finish processing the event,
// but regardless of the success or failure of the result,
Expand Down Expand Up @@ -2042,15 +2046,18 @@ public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundExcep
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
olapTable.writeLock();
try {
Partition partition = null;
if (info.isTempPartition()) {
olapTable.dropTempPartition(info.getPartitionName(), true);
} else {
Partition partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(),
info.isForceDrop());
partition = olapTable.dropPartition(info.getDbId(), info.getPartitionName(), info.isForceDrop());
if (!info.isForceDrop() && partition != null && info.getRecycleTime() != 0) {
Env.getCurrentRecycleBin().setRecycleTimeByIdForReplay(partition.getId(), info.getRecycleTime());
}
}
if (partition != null && !info.isTempPartition()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if we query on temp partition?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

对temp partition的count() 不会被记录到缓存中,也不会被应用到 select count() from t; 上。
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java 的tryRewrite() 函数里会检查是否有指定分区。如果指定分区就不rewrite

SimpleAggCacheMgr.internalInstance().invalidateTable(olapTable.getId());
}
olapTable.updateVisibleVersionAndTime(info.getVersion(), info.getVersionTime());
// Replay set new partition loaded flag to true for auto analyze.
TableStatsMeta stats = Env.getCurrentEnv().getAnalysisManager().findTableStatsStatus(olapTable.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,18 @@ public void removeStats(ColumnMinMaxKey key) {
cache.synchronous().invalidate(key);
}

/**
* Evict all cached simple aggregate values for a table.
*/
public void invalidateTable(long tableId) {
if (rowCountCache != null) {
rowCountCache.synchronous().invalidate(tableId);
}
if (cache != null) {
cache.synchronous().asMap().keySet().removeIf(key -> key.getTableId() == tableId);
}
}

/**
* Get the cached row count for a table.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("auto_partition_recycle_count_cache", "nonConcurrent") {
def oldCheckInterval = getFeConfig("dynamic_partition_check_interval_seconds")

try {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "DROP TABLE IF EXISTS auto_partition_recycle_count_cache FORCE"
sql """
CREATE TABLE auto_partition_recycle_count_cache (
k0 DATETIME(6) NOT NULL
) ENGINE=OLAP
DUPLICATE KEY(k0)
AUTO PARTITION BY RANGE(date_trunc(k0, 'day')) ()
DISTRIBUTED BY HASH(k0) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"partition.retention_count" = "3"
)
"""

sql """
INSERT INTO auto_partition_recycle_count_cache VALUES
('2020-01-01 00:00:00'),
('2020-01-02 00:00:00'),
('2020-01-03 00:00:00'),
('2020-01-04 00:00:00'),
('2020-01-05 00:00:00'),
('2020-01-06 00:00:00')
"""
sql "sync"

def partitionCount = sql "SHOW PARTITIONS FROM auto_partition_recycle_count_cache"
assertEquals(6, partitionCount.size())

sql "SELECT count(*) FROM auto_partition_recycle_count_cache"
def cacheReady = false
for (int i = 0; i < 30; i++) {
def explainResult = sql "EXPLAIN SELECT count(*) FROM auto_partition_recycle_count_cache"
if (explainResult.toString().contains("constant exprs")) {
cacheReady = true
break
}
sleep(1000)
}
if (!cacheReady) {
if (isCloudMode()) {
logger.info("SimpleAggCacheMgr did not warm up in cloud mode, skip")
return
}
assertTrue(false, "SimpleAggCacheMgr cache did not warm up within 30 seconds")
}

def countBeforeRecycle = sql "SELECT count(*) FROM auto_partition_recycle_count_cache"
assertEquals(6L, countBeforeRecycle[0][0] as long)

setFeConfig("dynamic_partition_check_interval_seconds", 1)
def recycled = false
for (int i = 0; i < 30; i++) {
partitionCount = sql "SHOW PARTITIONS FROM auto_partition_recycle_count_cache"
if (partitionCount.size() == 3) {
recycled = true
break
}
sleep(1000)
}
assertTrue(recycled, "auto partition retention did not recycle partitions within 30 seconds")

def countAfterRecycle = sql "SELECT count(*) FROM auto_partition_recycle_count_cache"
assertEquals(3L, countAfterRecycle[0][0] as long)
} finally {
setFeConfig("dynamic_partition_check_interval_seconds", oldCheckInterval)
sql "DROP TABLE IF EXISTS auto_partition_recycle_count_cache FORCE"
}
}
Loading