From 2628bd23985115c1bdb2d6fd613a1f3c508ef565 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Sun, 24 Sep 2023 18:53:18 +0800 Subject: [PATCH 1/7] keep cuncurrent save --- .../cache/schema/DataNodeSchemaCache.java | 43 +++++++++----- .../cache/schema/SchemaCacheEntry.java | 15 +---- .../cache/schema/TimeSeriesSchemaCache.java | 59 ++++++++++++++----- .../dualkeycache/impl/DualKeyCacheImpl.java | 59 ++++++++++--------- 4 files changed, 105 insertions(+), 71 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java index 90cebfee2bfef..7b50491ebc056 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/DataNodeSchemaCache.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.service.metric.MetricService; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree; @@ -42,7 +43,8 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; /** * This class takes the responsibility of metadata cache management of all DataRegions under @@ -191,6 +193,7 @@ public TimeValuePair getLastCache(PartialPath seriesPath) { } /** get SchemaCacheEntry and update last cache */ + @TestOnly public void updateLastCache( PartialPath devicePath, String measurement, @@ -207,20 +210,25 @@ public void updateLastCache( String[] measurements, MeasurementSchema[] measurementSchemas, boolean isAligned, - Function timeValuePairProvider, - Function shouldUpdateProvider, + IntFunction timeValuePairProvider, + IntPredicate shouldUpdateProvider, boolean highPriorityUpdate, Long latestFlushedTime) { - timeSeriesSchemaCache.updateLastCache( - database, - devicePath, - measurements, - measurementSchemas, - isAligned, - timeValuePairProvider, - shouldUpdateProvider, - highPriorityUpdate, - latestFlushedTime); + takeReadLock(); + try { + timeSeriesSchemaCache.updateLastCache( + database, + devicePath, + measurements, + measurementSchemas, + isAligned, + timeValuePairProvider, + shouldUpdateProvider, + highPriorityUpdate, + latestFlushedTime); + } finally { + releaseReadLock(); + } } /** @@ -233,8 +241,13 @@ public void updateLastCache( TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { - timeSeriesSchemaCache.updateLastCache( - storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime); + takeReadLock(); + try { + timeSeriesSchemaCache.updateLastCache( + storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime); + } finally { + releaseReadLock(); + } } public void invalidateAll() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java index 790558c9d9f44..aaa6f60400e7b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java @@ -38,7 +38,7 @@ public class SchemaCacheEntry implements IMeasurementSchemaInfo { private final Map tagMap; private final boolean isAligned; - private volatile ILastCacheContainer lastCacheContainer = null; + private final ILastCacheContainer lastCacheContainer = new LastCacheContainer(); public SchemaCacheEntry( String storageGroup, @@ -76,13 +76,6 @@ public boolean isAligned() { } public ILastCacheContainer getLastCacheContainer() { - if (lastCacheContainer == null) { - synchronized (this) { - if (lastCacheContainer == null) { - lastCacheContainer = new LastCacheContainer(); - } - } - } return lastCacheContainer; } @@ -106,13 +99,9 @@ public ILastCacheContainer getLastCacheContainer() { */ public static int estimateSize(SchemaCacheEntry schemaCacheEntry) { // each char takes 2B in Java - int lastCacheContainerSize = - schemaCacheEntry.getLastCacheContainer() == null - ? 0 - : schemaCacheEntry.getLastCacheContainer().estimateSize(); return 100 + 2 * schemaCacheEntry.getIMeasurementSchema().getMeasurementId().length() - + lastCacheContainerSize; + + schemaCacheEntry.getLastCacheContainer().estimateSize(); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java index 0d3321d9d73ed..b308e54354696 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java @@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.schema.view.LogicalViewSchema; +import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.metadata.view.InsertNonWritableViewException; @@ -46,7 +47,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.function.IntPredicate; public class TimeSeriesSchemaCache { @@ -222,12 +224,6 @@ public void computeValue(int index, SchemaCacheEntry value) { return new Pair<>(indexOfMissingMeasurements, missedPathStringList); } - public void put(ClusterSchemaTree schemaTree) { - for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) { - putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), measurementPath); - } - } - public void putSingleMeasurementPath(String storageGroup, MeasurementPath measurementPath) { SchemaCacheEntry schemaCacheEntry = new SchemaCacheEntry( @@ -250,6 +246,7 @@ public TimeValuePair getLastCache(PartialPath seriesPath) { } /** get SchemaCacheEntry and update last cache */ + @TestOnly public void updateLastCache( PartialPath devicePath, String measurement, @@ -272,8 +269,8 @@ public void updateLastCache( String[] measurements, MeasurementSchema[] measurementSchemas, boolean isAligned, - Function timeValuePairProvider, - Function shouldUpdateProvider, + IntFunction timeValuePairProvider, + IntPredicate shouldUpdateProvider, boolean highPriorityUpdate, Long latestFlushedTime) { SchemaCacheEntry entry; @@ -292,7 +289,7 @@ public String[] getSecondKeyList() { @Override public int updateValue(int index, SchemaCacheEntry value) { - if (!shouldUpdateProvider.apply(index)) { + if (!shouldUpdateProvider.test(index)) { return 0; } if (value == null) { @@ -316,10 +313,25 @@ public int updateValue(int index, SchemaCacheEntry value) { } } } - - DataNodeLastCacheManager.updateLastCache( - entry, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime); } + dualKeyCache.update( + new IDualKeyCacheUpdating() { + @Override + public PartialPath getFirstKey() { + return devicePath; + } + + @Override + public String[] getSecondKeyList() { + return missingMeasurements.stream().map(i -> measurements[i]).toArray(String[]::new); + } + + @Override + public int updateValue(int index, SchemaCacheEntry value) { + return DataNodeLastCacheManager.updateLastCache( + value, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime); + } + }); } /** @@ -342,16 +354,31 @@ public void updateLastCache( entry = new SchemaCacheEntry( storageGroup, - (MeasurementSchema) measurementPath.getMeasurementSchema(), + measurementPath.getMeasurementSchema(), measurementPath.getTagMap(), measurementPath.isUnderAlignedEntity()); dualKeyCache.put(seriesPath.getDevicePath(), seriesPath.getMeasurement(), entry); } } } + dualKeyCache.update( + new IDualKeyCacheUpdating() { + @Override + public PartialPath getFirstKey() { + return measurementPath.getDevicePath(); + } - DataNodeLastCacheManager.updateLastCache( - entry, timeValuePair, highPriorityUpdate, latestFlushedTime); + @Override + public String[] getSecondKeyList() { + return new String[] {measurementPath.getMeasurement()}; + } + + @Override + public int updateValue(int index, SchemaCacheEntry value) { + return DataNodeLastCacheManager.updateLastCache( + value, timeValuePair, highPriorityUpdate, latestFlushedTime); + } + }); } public void invalidateAll() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java index aed8277024f15..f82f35bb980d3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java @@ -115,14 +115,17 @@ public void update(IDualKeyCacheUpdating updating) { if (cacheEntry == null) { updating.updateValue(i, null); } else { - int changeSize = updating.updateValue(i, cacheEntry.getValue()); - cacheEntryManager.access(cacheEntry); - if (changeSize != 0) { - cacheStats.increaseMemoryUsage(changeSize); - if (cacheStats.isExceedMemoryCapacity()) { - executeCacheEviction(changeSize); + int changeSize; + synchronized (cacheEntry) { + changeSize = updating.updateValue(i, cacheEntry.getValue()); + cacheEntryManager.access(cacheEntry); + if (changeSize != 0) { + cacheStats.increaseMemoryUsage(changeSize); } } + if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) { + executeCacheEviction(changeSize); + } hitCount++; } } @@ -194,31 +197,33 @@ private int evictOneCacheEntry() { if (evictCacheEntry == null) { return 0; } - AtomicInteger evictedSize = new AtomicInteger(0); - evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue())); + synchronized (evictCacheEntry) { + AtomicInteger evictedSize = new AtomicInteger(0); + evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue())); - ICacheEntryGroup belongedGroup = evictCacheEntry.getBelongedGroup(); - belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey()); - evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey())); + ICacheEntryGroup belongedGroup = evictCacheEntry.getBelongedGroup(); + belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey()); + evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey())); - if (belongedGroup.isEmpty()) { - firstKeyMap.compute( - belongedGroup.getFirstKey(), - (firstKey, cacheEntryGroup) -> { - if (cacheEntryGroup == null) { - // has been removed by other threads - return null; - } - if (cacheEntryGroup.isEmpty()) { - evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey)); - return null; - } + if (belongedGroup.isEmpty()) { + firstKeyMap.compute( + belongedGroup.getFirstKey(), + (firstKey, cacheEntryGroup) -> { + if (cacheEntryGroup == null) { + // has been removed by other threads + return null; + } + if (cacheEntryGroup.isEmpty()) { + evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey)); + return null; + } - // some other thread has put value to it - return cacheEntryGroup; - }); + // some other thread has put value to it + return cacheEntryGroup; + }); + } + return evictedSize.get(); } - return evictedSize.get(); } @Override From b8ad9c269be8b318daef157c311579e71d45324f Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Mon, 25 Sep 2023 09:50:33 +0800 Subject: [PATCH 2/7] fix --- .../plan/analyze/cache/schema/TimeSeriesSchemaCache.java | 3 +++ pom.xml | 1 + 2 files changed, 4 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java index b308e54354696..8b7469a571b35 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java @@ -328,6 +328,9 @@ public String[] getSecondKeyList() { @Override public int updateValue(int index, SchemaCacheEntry value) { + if (!shouldUpdateProvider.test(index)) { + return 0; + } return DataNodeLastCacheManager.updateLastCache( value, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime); } diff --git a/pom.xml b/pom.xml index 5641df38630c7..784c6a3b9dcbc 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ distribution example library-udf + integration-test From d59db1447128b8695e41a6ea57ff31171462fa63 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Mon, 25 Sep 2023 11:34:53 +0800 Subject: [PATCH 3/7] fix --- .../plan/analyze/cache/schema/TimeSeriesSchemaCache.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java index 8b7469a571b35..1fcc568a9d7a6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java @@ -323,14 +323,14 @@ public PartialPath getFirstKey() { @Override public String[] getSecondKeyList() { - return missingMeasurements.stream().map(i -> measurements[i]).toArray(String[]::new); + return missingMeasurements.stream() + .filter(shouldUpdateProvider::test) + .map(i -> measurements[i]) + .toArray(String[]::new); } @Override public int updateValue(int index, SchemaCacheEntry value) { - if (!shouldUpdateProvider.test(index)) { - return 0; - } return DataNodeLastCacheManager.updateLastCache( value, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime); } From 05c32bf6d7bed19a4ac883ee785b19efacedea08 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Mon, 25 Sep 2023 16:01:43 +0800 Subject: [PATCH 4/7] fix --- .../analyze/cache/schema/TimeSeriesSchemaCache.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java index 1fcc568a9d7a6..a1b8f508f7e71 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/TimeSeriesSchemaCache.java @@ -323,16 +323,16 @@ public PartialPath getFirstKey() { @Override public String[] getSecondKeyList() { - return missingMeasurements.stream() - .filter(shouldUpdateProvider::test) - .map(i -> measurements[i]) - .toArray(String[]::new); + return missingMeasurements.stream().map(i -> measurements[i]).toArray(String[]::new); } @Override public int updateValue(int index, SchemaCacheEntry value) { return DataNodeLastCacheManager.updateLastCache( - value, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime); + value, + timeValuePairProvider.apply(missingMeasurements.get(index)), + highPriorityUpdate, + latestFlushedTime); } }); } From fea0c225943fb9bb92543c4a0f8c4791d32df22a Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Tue, 26 Sep 2023 15:11:27 +0800 Subject: [PATCH 5/7] fix review --- .../cache/schema/SchemaCacheEntry.java | 24 +++++++++++++++++-- .../dualkeycache/impl/DualKeyCacheImpl.java | 16 +++++++++---- .../impl/FIFOCacheEntryManager.java | 7 +++++- .../schema/dualkeycache/impl/ICacheEntry.java | 2 ++ .../impl/LRUCacheEntryManager.java | 7 +++++- .../lastcache/DataNodeLastCacheManager.java | 4 ++-- pom.xml | 1 - 7 files changed, 49 insertions(+), 12 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java index aaa6f60400e7b..a977a64ab47d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java @@ -38,7 +38,7 @@ public class SchemaCacheEntry implements IMeasurementSchemaInfo { private final Map tagMap; private final boolean isAligned; - private final ILastCacheContainer lastCacheContainer = new LastCacheContainer(); + private ILastCacheContainer lastCacheContainer = null; public SchemaCacheEntry( String storageGroup, @@ -79,6 +79,22 @@ public ILastCacheContainer getLastCacheContainer() { return lastCacheContainer; } + /** + * Get and init last cache container. Notice that this method may change estimated size of this. + * + * @return last cache container which is not null + */ + public ILastCacheContainer getAndInitLastCacheContainer() { + if (lastCacheContainer == null) { + synchronized (this) { + if (lastCacheContainer == null) { + lastCacheContainer = new LastCacheContainer(); + } + } + } + return lastCacheContainer; + } + /** * Total basic 100B * @@ -99,9 +115,13 @@ public ILastCacheContainer getLastCacheContainer() { */ public static int estimateSize(SchemaCacheEntry schemaCacheEntry) { // each char takes 2B in Java + int lastCacheContainerSize = + schemaCacheEntry.getLastCacheContainer() == null + ? 0 + : schemaCacheEntry.getLastCacheContainer().estimateSize(); return 100 + 2 * schemaCacheEntry.getIMeasurementSchema().getMeasurementId().length() - + schemaCacheEntry.getLastCacheContainer().estimateSize(); + + lastCacheContainerSize; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java index f82f35bb980d3..c02a3173ccd68 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java @@ -115,12 +115,17 @@ public void update(IDualKeyCacheUpdating updating) { if (cacheEntry == null) { updating.updateValue(i, null); } else { - int changeSize; + int changeSize = 0; synchronized (cacheEntry) { - changeSize = updating.updateValue(i, cacheEntry.getValue()); - cacheEntryManager.access(cacheEntry); - if (changeSize != 0) { - cacheStats.increaseMemoryUsage(changeSize); + if (cacheEntry.getBelongedGroup() != null) { + // Only update the value when the cache entry is not evicted. + // If the cache entry is evicted, getBelongedGroup is null. + // Synchronized is to guarantee the cache entry is not evicted during the update. + changeSize = updating.updateValue(i, cacheEntry.getValue()); + cacheEntryManager.access(cacheEntry); + if (changeSize != 0) { + cacheStats.increaseMemoryUsage(changeSize); + } } } if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) { @@ -202,6 +207,7 @@ private int evictOneCacheEntry() { evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue())); ICacheEntryGroup belongedGroup = evictCacheEntry.getBelongedGroup(); + evictCacheEntry.setBelongedGroup(null); belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey()); evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey())); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java index 4d3c488d8eb85..778dc78f13ae5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java @@ -105,7 +105,7 @@ private int getNextIndex(AtomicInteger roundRobinIndex) { static class FIFOCacheEntry implements ICacheEntry { private final SK secondKey; - private final ICacheEntryGroup cacheEntryGroup; + private ICacheEntryGroup cacheEntryGroup; private V value; @@ -132,6 +132,11 @@ public ICacheEntryGroup getBelongedGroup() { return cacheEntryGroup; } + @Override + public void setBelongedGroup(ICacheEntryGroup belongedGroup) { + this.cacheEntryGroup = belongedGroup; + } + @Override public void replaceValue(V newValue) { this.value = newValue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java index 87fc118c61312..655b8c6e56ab2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntry.java @@ -35,5 +35,7 @@ interface ICacheEntry { ICacheEntryGroup getBelongedGroup(); + void setBelongedGroup(ICacheEntryGroup belongedGroup); + void replaceValue(V newValue); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java index e5f8ff8749615..6ed49610673d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java @@ -103,7 +103,7 @@ private LRULinkedList getBelongedList(LRUCacheEntry cacheEntry) { static class LRUCacheEntry implements ICacheEntry { private final SK secondKey; - private final ICacheEntryGroup cacheEntryGroup; + private ICacheEntryGroup cacheEntryGroup; private V value; @@ -131,6 +131,11 @@ public ICacheEntryGroup getBelongedGroup() { return cacheEntryGroup; } + @Override + public void setBelongedGroup(ICacheEntryGroup belongedGroup) { + this.cacheEntryGroup = belongedGroup; + } + @Override public void replaceValue(V newValue) { this.value = newValue; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java index 678c7e8b43069..83de186788b18 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java @@ -43,7 +43,7 @@ public static TimeValuePair getLastCache(SchemaCacheEntry entry) { return null; } ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer(); - return lastCacheContainer.getCachedLast(); + return lastCacheContainer == null ? null : lastCacheContainer.getCachedLast(); } /** @@ -63,7 +63,7 @@ public static int updateLastCache( if (!CACHE_ENABLED || null == entry) { return 0; } - ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer(); + ILastCacheContainer lastCacheContainer = entry.getAndInitLastCacheContainer(); return lastCacheContainer.updateCachedLast( timeValuePair, highPriorityUpdate, latestFlushedTime); } diff --git a/pom.xml b/pom.xml index 784c6a3b9dcbc..5641df38630c7 100644 --- a/pom.xml +++ b/pom.xml @@ -87,7 +87,6 @@ distribution example library-udf - integration-test From 65949c827b3b9b372284b527d09811590b27313d Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Tue, 26 Sep 2023 15:13:27 +0800 Subject: [PATCH 6/7] volatile --- .../queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java | 2 +- .../cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java | 2 +- .../cache/schema/dualkeycache/impl/LRUCacheEntryManager.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java index a977a64ab47d1..5f6a60914e99a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java @@ -38,7 +38,7 @@ public class SchemaCacheEntry implements IMeasurementSchemaInfo { private final Map tagMap; private final boolean isAligned; - private ILastCacheContainer lastCacheContainer = null; + private volatile ILastCacheContainer lastCacheContainer = null; public SchemaCacheEntry( String storageGroup, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java index 778dc78f13ae5..661e559dd8254 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/FIFOCacheEntryManager.java @@ -105,7 +105,7 @@ private int getNextIndex(AtomicInteger roundRobinIndex) { static class FIFOCacheEntry implements ICacheEntry { private final SK secondKey; - private ICacheEntryGroup cacheEntryGroup; + private volatile ICacheEntryGroup cacheEntryGroup; private V value; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java index 6ed49610673d4..a350c7c98ce83 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/LRUCacheEntryManager.java @@ -103,7 +103,7 @@ private LRULinkedList getBelongedList(LRUCacheEntry cacheEntry) { static class LRUCacheEntry implements ICacheEntry { private final SK secondKey; - private ICacheEntryGroup cacheEntryGroup; + private volatile ICacheEntryGroup cacheEntryGroup; private V value; From 1e3aa282ec3bdda0de21e7c91a52f699bd9ff07f Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Tue, 26 Sep 2023 16:59:47 +0800 Subject: [PATCH 7/7] fix ut --- .../cache/schema/SchemaCacheEntry.java | 19 +++++++++++-------- .../lastcache/DataNodeLastCacheManager.java | 4 +--- .../cache/dualkeycache/DualKeyCacheTest.java | 4 +--- 3 files changed, 13 insertions(+), 14 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java index 5f6a60914e99a..8df16a91ba580 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/SchemaCacheEntry.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.ILastCacheContainer; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; +import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -79,20 +80,22 @@ public ILastCacheContainer getLastCacheContainer() { return lastCacheContainer; } - /** - * Get and init last cache container. Notice that this method may change estimated size of this. - * - * @return last cache container which is not null - */ - public ILastCacheContainer getAndInitLastCacheContainer() { + public int updateLastCache( + TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { + if (lastCacheContainer == null) { synchronized (this) { if (lastCacheContainer == null) { - lastCacheContainer = new LastCacheContainer(); + ILastCacheContainer tmp = new LastCacheContainer(); + int changeSize = tmp.estimateSize(); + changeSize += tmp.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); + lastCacheContainer = tmp; + return changeSize; } } } - return lastCacheContainer; + return lastCacheContainer.updateCachedLast( + timeValuePair, highPriorityUpdate, latestFlushedTime); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java index 83de186788b18..6790f2cb645ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/lastcache/DataNodeLastCacheManager.java @@ -63,8 +63,6 @@ public static int updateLastCache( if (!CACHE_ENABLED || null == entry) { return 0; } - ILastCacheContainer lastCacheContainer = entry.getAndInitLastCacheContainer(); - return lastCacheContainer.updateCachedLast( - timeValuePair, highPriorityUpdate, latestFlushedTime); + return entry.updateLastCache(timeValuePair, highPriorityUpdate, latestFlushedTime); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java index 2db2b717a3183..4cef9502f8e20 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/dualkeycache/DualKeyCacheTest.java @@ -189,9 +189,7 @@ public int updateValue(int index, SchemaCacheEntry value) { } }); int tmp = SchemaCacheEntry.estimateSize(schemaCacheEntry); - schemaCacheEntry - .getLastCacheContainer() - .updateCachedLast(new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), true, 0L); + schemaCacheEntry.updateLastCache(new TimeValuePair(1L, new TsPrimitiveType.TsInt(1)), true, 0L); expectedSize += (SchemaCacheEntry.estimateSize(schemaCacheEntry) - tmp) * 2; Assert.assertEquals(expectedSize, dualKeyCache.stats().memoryUsage()); }