Skip to content

Commit

Permalink
fix sql cache return old value when truncate partition
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 committed May 11, 2024
1 parent bc13672 commit 45f8994
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1308,12 +1308,18 @@ public class Config extends ConfigBase {
* Minimum interval between last version when caching results,
* This parameter distinguishes between offline and real-time updates
*/
@ConfField(mutable = true, masterOnly = false)
public static int cache_last_version_interval_second = 30;

/**
* Expire sql sql in frontend time
*/
@ConfField(
mutable = true,
masterOnly = false,
callbackClassString = "org.apache.doris.common.NereidsSqlCacheManager$UpdateConfig"
)
public static int cache_last_version_interval_second = 30;
public static int expire_sql_cache_in_fe_second = 300;

/**
* Set the maximum number of rows that can be cached
Expand Down
4 changes: 1 addition & 3 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
Original file line number Diff line number Diff line change
Expand Up @@ -780,9 +780,7 @@ public Env(boolean isCheckpointCatalog) {
this.mtmvService = new MTMVService();
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager(
Config.sql_cache_manage_num, Config.cache_last_version_interval_second
);
this.sqlCacheManager = new NereidsSqlCacheManager();
}

public static void destroyCheckpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ public class NereidsSqlCacheManager {
// value: SqlCacheContext
private volatile Cache<String, SqlCacheContext> sqlCaches;

public NereidsSqlCacheManager(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCaches = buildSqlCaches(sqlCacheNum, cacheIntervalSeconds);
public NereidsSqlCacheManager() {
sqlCaches = buildSqlCaches(
Config.sql_cache_manage_num,
Config.expire_sql_cache_in_fe_second
);
}

public static synchronized void updateConfig() {
Expand All @@ -90,22 +93,24 @@ public static synchronized void updateConfig() {

Cache<String, SqlCacheContext> sqlCaches = buildSqlCaches(
Config.sql_cache_manage_num,
Config.cache_last_version_interval_second
Config.expire_sql_cache_in_fe_second
);
sqlCaches.putAll(sqlCacheManager.sqlCaches.asMap());
sqlCacheManager.sqlCaches = sqlCaches;
}

private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long cacheIntervalSeconds) {
sqlCacheNum = sqlCacheNum < 0 ? 100 : sqlCacheNum;
cacheIntervalSeconds = cacheIntervalSeconds < 0 ? 30 : cacheIntervalSeconds;

return Caffeine.newBuilder()
.maximumSize(sqlCacheNum)
.expireAfterAccess(Duration.ofSeconds(cacheIntervalSeconds))
private static Cache<String, SqlCacheContext> buildSqlCaches(int sqlCacheNum, long expireAfterAccessSeconds) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
// auto evict cache when jvm memory too low
.softValues()
.build();
.softValues();
if (sqlCacheNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sqlCacheNum);
}
if (expireAfterAccessSeconds > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
}

return cacheBuilder.build();
}

/** tryAddFeCache */
Expand Down Expand Up @@ -237,9 +242,6 @@ public Optional<LogicalSqlCache> tryParseSql(ConnectContext connectContext, Stri
}

private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
long latestPartitionTime = sqlCacheContext.getLatestPartitionTime();
long latestPartitionVersion = sqlCacheContext.getLatestPartitionVersion();

if (sqlCacheContext.hasUnsupportedTables()) {
return true;
}
Expand All @@ -255,7 +257,7 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
long cacheTableTime = scanTable.latestTimestamp;
long currentTableVersion = olapTable.getVisibleVersion();
long cacheTableVersion = scanTable.latestVersion;
// some partitions have been dropped, or delete or update or insert rows into new partition?
// some partitions have been dropped, or delete or updated or replaced, or insert rows into new partition?
if (currentTableTime > cacheTableTime
|| (currentTableTime == cacheTableTime && currentTableVersion > cacheTableVersion)) {
return true;
Expand All @@ -264,9 +266,7 @@ private boolean tablesOrDataChanged(Env env, SqlCacheContext sqlCacheContext) {
for (Long scanPartitionId : scanTable.getScanPartitions()) {
Partition partition = olapTable.getPartition(scanPartitionId);
// partition == null: is this partition truncated?
if (partition == null || partition.getVisibleVersionTime() > latestPartitionTime
|| (partition.getVisibleVersionTime() == latestPartitionTime
&& partition.getVisibleVersion() > latestPartitionVersion)) {
if (partition == null) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,11 +701,11 @@ private CacheTable buildCacheTableForOlapScanNode(OlapScanNode node) {
scanTables.add(scanTable);
for (Long partitionId : node.getSelectedPartitionIds()) {
Partition partition = olapTable.getPartition(partitionId);
scanTable.addScanPartition(partitionId);
if (partition.getVisibleVersionTime() >= cacheTable.latestPartitionTime) {
cacheTable.latestPartitionId = partition.getId();
cacheTable.latestPartitionTime = partition.getVisibleVersionTime();
cacheTable.latestPartitionVersion = partition.getVisibleVersion();
scanTable.addScanPartition(partitionId);
}
}
return cacheTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ suite("parse_sql_from_sql_cache") {
}
}


sql "ADMIN SET FRONTEND CONFIG ('cache_last_version_interval_second' = '10')"

combineFutures(
Expand Down Expand Up @@ -714,6 +713,43 @@ suite("parse_sql_from_sql_cache") {
assertHasCache "select * from test_use_plan_cache20 where id=999"
def result6 = sql "select * from test_use_plan_cache20 where id=999"
assertTrue(result6.isEmpty())
}),
extraThread("test_truncate_partition", {
sql "drop table if exists test_use_plan_cache21"
sql """create table test_use_plan_cache21 (
id int,
dt int
)
partition by range(dt)
(
partition dt1 values [('1'), ('2')),
partition dt2 values [('2'), ('3'))
)
distributed by hash(id)
properties('replication_num'='1')"""



sql "insert into test_use_plan_cache21 values('2', '2')"
sleep(100)
sql "insert into test_use_plan_cache21 values('1', '1')"

// after partition changed 10s, the sql cache can be used
sleep(10000)

sql "set enable_nereids_planner=true"
sql "set enable_fallback_to_original_planner=false"
sql "set enable_sql_cache=true"

assertNoCache "select * from test_use_plan_cache21"
def result1 = sql "select * from test_use_plan_cache21"
assertTrue(result1.size() == 2)
assertHasCache "select * from test_use_plan_cache21"

sql "truncate table test_use_plan_cache21 partition dt2"
assertNoCache "select * from test_use_plan_cache21"
def result2 = sql "select * from test_use_plan_cache21"
assertTrue(result2.size() == 1)
})
).get()
}

0 comments on commit 45f8994

Please sign in to comment.