diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java index 90cebfee2bfef..7b50491ebc056 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; @@ -42,7 +43,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; /** * This class takes the responsibility of metadata cache management of all DataRegions under @@ -191,6 +193,7 @@ public TimeValuePair getLastCache(PartialPath seriesPath) { } /** get SchemaCacheEntry and update last cache */ + @TestOnly public void updateLastCache( PartialPath devicePath, String measurement, @@ -207,20 +210,25 @@ public void updateLastCache( String[] measurements, MeasurementSchema[] measurementSchemas, boolean isAligned, - Function timeValuePairProvider, - Function shouldUpdateProvider, + IntFunction timeValuePairProvider, + IntPredicate shouldUpdateProvider, boolean highPriorityUpdate, Long latestFlushedTime) { - timeSeriesSchemaCache.updateLastCache( - database, - devicePath, - measurements, - measurementSchemas, - isAligned, - timeValuePairProvider, - shouldUpdateProvider, - highPriorityUpdate, - latestFlushedTime); + takeReadLock(); + try { + timeSeriesSchemaCache.updateLastCache( + database, + devicePath, + measurements, + measurementSchemas, + isAligned, + timeValuePairProvider, + shouldUpdateProvider, + highPriorityUpdate, + latestFlushedTime); + } finally { + releaseReadLock(); + } } /** @@ -233,8 +241,13 @@ public void updateLastCache( TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { - timeSeriesSchemaCache.updateLastCache( - storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime); + takeReadLock(); + try { + timeSeriesSchemaCache.updateLastCache( + storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime); + } finally { + releaseReadLock(); + } } public void invalidateAll() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java index 790558c9d9f44..8df16a91ba580 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.ILastCacheContainer; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -76,14 +77,25 @@ public boolean isAligned() { } public ILastCacheContainer getLastCacheContainer() { + return lastCacheContainer; + } + + public int updateLastCache( + TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { + if (lastCacheContainer == null) { synchronized (this) { if (lastCacheContainer == null) { - lastCacheContainer = new LastCacheContainer(); + ILastCacheContainer tmp = new LastCacheContainer(); + int changeSize = tmp.estimateSize(); + changeSize += tmp.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); + lastCacheContainer = tmp; + return changeSize; } } } - return lastCacheContainer; + return lastCacheContainer.updateCachedLast( + timeValuePair, highPriorityUpdate, latestFlushedTime); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java index 0d3321d9d73ed..a1b8f508f7e71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.LogicalViewSchema; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.view.InsertNonWritableViewException; @@ -46,7 +47,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; public class TimeSeriesSchemaCache { @@ -222,12 +224,6 @@ public void computeValue(int index, SchemaCacheEntry value) { return new Pair<>(indexOfMissingMeasurements, missedPathStringList); } - public void put(ClusterSchemaTree schemaTree) { - for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) { - putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), measurementPath); - } - } - public void putSingleMeasurementPath(String storageGroup, MeasurementPath measurementPath) { SchemaCacheEntry schemaCacheEntry = new SchemaCacheEntry( @@ -250,6 +246,7 @@ public TimeValuePair getLastCache(PartialPath seriesPath) { } /** get SchemaCacheEntry and update last cache */ + @TestOnly public void updateLastCache( PartialPath devicePath, String measurement, @@ -272,8 +269,8 @@ public void updateLastCache( String[] measurements, MeasurementSchema[] measurementSchemas, boolean isAligned, - Function timeValuePairProvider, - Function shouldUpdateProvider, + IntFunction timeValuePairProvider, + IntPredicate shouldUpdateProvider, boolean highPriorityUpdate, Long latestFlushedTime) { SchemaCacheEntry entry; @@ -292,7 +289,7 @@ public String[] getSecondKeyList() { @Override public int updateValue(int index, SchemaCacheEntry value) { - if (!shouldUpdateProvider.apply(index)) { + if (!shouldUpdateProvider.test(index)) { return 0; } if (value == null) { @@ -316,10 +313,28 @@ public int updateValue(int index, SchemaCacheEntry value) { } } } - - DataNodeLastCacheManager.updateLastCache( - entry, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime); } + dualKeyCache.update( + new IDualKeyCacheUpdating() { + @Override + public PartialPath getFirstKey() { + return devicePath; + } + + @Override + public String[] getSecondKeyList() { + return missingMeasurements.stream().map(i -> measurements[i]).toArray(String[]::new); + } + + @Override + public int updateValue(int index, SchemaCacheEntry value) { + return DataNodeLastCacheManager.updateLastCache( + value, + timeValuePairProvider.apply(missingMeasurements.get(index)), + highPriorityUpdate, + latestFlushedTime); + } + }); } /** @@ -342,16 +357,31 @@ public void updateLastCache( entry = new SchemaCacheEntry( storageGroup, - (MeasurementSchema) measurementPath.getMeasurementSchema(), + measurementPath.getMeasurementSchema(), measurementPath.getTagMap(), measurementPath.isUnderAlignedEntity()); dualKeyCache.put(seriesPath.getDevicePath(), seriesPath.getMeasurement(), entry); } } } + dualKeyCache.update( + new IDualKeyCacheUpdating() { + @Override + public PartialPath getFirstKey() { + return measurementPath.getDevicePath(); + } - DataNodeLastCacheManager.updateLastCache( - entry, timeValuePair, highPriorityUpdate, latestFlushedTime); + @Override + public String[] getSecondKeyList() { + return new String[] {measurementPath.getMeasurement()}; + } + + @Override + public int updateValue(int index, SchemaCacheEntry value) { + return DataNodeLastCacheManager.updateLastCache( + value, timeValuePair, highPriorityUpdate, latestFlushedTime); + } + }); } public void invalidateAll() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java index aed8277024f15..c02a3173ccd68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java @@ -115,14 +115,22 @@ public void update(IDualKeyCacheUpdating updating) { if (cacheEntry == null) { updating.updateValue(i, null); } else { - int changeSize = updating.updateValue(i, cacheEntry.getValue()); - cacheEntryManager.access(cacheEntry); - if (changeSize != 0) { - cacheStats.increaseMemoryUsage(changeSize); - if (cacheStats.isExceedMemoryCapacity()) { - executeCacheEviction(changeSize); + int changeSize = 0; + synchronized (cacheEntry) { + if (cacheEntry.getBelongedGroup() != null) { + // Only update the value when the cache entry is not evicted. + // If the cache entry is evicted, getBelongedGroup is null. + // Synchronized is to guarantee the cache entry is not evicted during the update. + changeSize = updating.updateValue(i, cacheEntry.getValue()); + cacheEntryManager.access(cacheEntry); + if (changeSize != 0) { + cacheStats.increaseMemoryUsage(changeSize); + } } } + if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) { + executeCacheEviction(changeSize); + } hitCount++; } } @@ -194,31 +202,34 @@ private int evictOneCacheEntry() { if (evictCacheEntry == null) { return 0; } - AtomicInteger evictedSize = new AtomicInteger(0); - evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue())); + synchronized (evictCacheEntry) { + AtomicInteger evictedSize = new AtomicInteger(0); + evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue())); - ICacheEntryGroup belongedGroup = evictCacheEntry.getBelongedGroup(); - belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey()); - evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey())); + ICacheEntryGroup belongedGroup = evictCacheEntry.getBelongedGroup(); + evictCacheEntry.setBelongedGroup(null); + belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey()); + evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey())); - if (belongedGroup.isEmpty()) { - firstKeyMap.compute( - belongedGroup.getFirstKey(), - (firstKey, cacheEntryGroup) -> { - if (cacheEntryGroup == null) { - // has been removed by other threads - return null; - } - if (cacheEntryGroup.isEmpty()) { - evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey)); - return null; - } + if (belongedGroup.isEmpty()) { + firstKeyMap.compute( + belongedGroup.getFirstKey(), + (firstKey, cacheEntryGroup) -> { + if (cacheEntryGroup == null) { + // has been removed by other threads + return null; + } + if (cacheEntryGroup.isEmpty()) { + evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey)); + return null; + } - // some other thread has put value to it - return cacheEntryGroup; - }); + // some other thread has put value to it + return cacheEntryGroup; + }); + } + return evictedSize.get(); } - return evictedSize.get(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java index 4d3c488d8eb85..661e559dd8254 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java @@ -105,7 +105,7 @@ private int getNextIndex(AtomicInteger roundRobinIndex) { static class FIFOCacheEntry implements ICacheEntry { private final SK secondKey; - private final ICacheEntryGroup cacheEntryGroup; + private volatile ICacheEntryGroup cacheEntryGroup; private V value; @@ -132,6 +132,11 @@ public ICacheEntryGroup getBelongedGroup() { return cacheEntryGroup; } + @Override + public void setBelongedGroup(ICacheEntryGroup belongedGroup) { + this.cacheEntryGroup = belongedGroup; + } + @Override public void replaceValue(V newValue) { this.value = newValue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java index 87fc118c61312..655b8c6e56ab2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java @@ -35,5 +35,7 @@ interface ICacheEntry { ICacheEntryGroup getBelongedGroup(); + void setBelongedGroup(ICacheEntryGroup belongedGroup); + void replaceValue(V newValue); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java index e5f8ff8749615..a350c7c98ce83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java @@ -103,7 +103,7 @@ private LRULinkedList getBelongedList(LRUCacheEntry cacheEntry) { static class LRUCacheEntry implements ICacheEntry { private final SK secondKey; - private final ICacheEntryGroup cacheEntryGroup; + private volatile ICacheEntryGroup cacheEntryGroup; private V value; @@ -131,6 +131,11 @@ public ICacheEntryGroup getBelongedGroup() { return cacheEntryGroup; } + @Override + public void setBelongedGroup(ICacheEntryGroup belongedGroup) { + this.cacheEntryGroup = belongedGroup; + } + @Override public void replaceValue(V newValue) { this.value = newValue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java index 678c7e8b43069..6790f2cb645ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java @@ -43,7 +43,7 @@ public static TimeValuePair getLastCache(SchemaCacheEntry entry) { return null; } ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer(); - return lastCacheContainer.getCachedLast(); + return lastCacheContainer == null ? null : lastCacheContainer.getCachedLast(); } /** @@ -63,8 +63,6 @@ public static int updateLastCache( if (!CACHE_ENABLED || null == entry) { return 0; } - ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer(); - return lastCacheContainer.updateCachedLast( - timeValuePair, highPriorityUpdate, latestFlushedTime); + return entry.updateLastCache(timeValuePair, highPriorityUpdate, latestFlushedTime); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java index 2db2b717a3183..4cef9502f8e20 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java @@ -189,9 +189,7 @@ public int updateValue(int index, SchemaCacheEntry value) { } }); int tmp = SchemaCacheEntry.estimateSize(schemaCacheEntry); - schemaCacheEntry - .getLastCacheContainer() - .updateCachedLast(new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), true, 0L); + schemaCacheEntry.updateLastCache(new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), true, 0L); expectedSize += (SchemaCacheEntry.estimateSize(schemaCacheEntry) - tmp) * 2; Assert.assertEquals(expectedSize, dualKeyCache.stats().memoryUsage()); }