Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
Expand All @@ -42,7 +43,8 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;

/**
* This class takes the responsibility of metadata cache management of all DataRegions under
Expand Down Expand Up @@ -191,6 +193,7 @@ public TimeValuePair getLastCache(PartialPath seriesPath) {
}

/** get SchemaCacheEntry and update last cache */
@TestOnly
public void updateLastCache(
PartialPath devicePath,
String measurement,
Expand All @@ -207,20 +210,25 @@ public void updateLastCache(
String[] measurements,
MeasurementSchema[] measurementSchemas,
boolean isAligned,
Function<Integer, TimeValuePair> timeValuePairProvider,
Function<Integer, Boolean> shouldUpdateProvider,
IntFunction<TimeValuePair> timeValuePairProvider,
IntPredicate shouldUpdateProvider,
boolean highPriorityUpdate,
Long latestFlushedTime) {
timeSeriesSchemaCache.updateLastCache(
database,
devicePath,
measurements,
measurementSchemas,
isAligned,
timeValuePairProvider,
shouldUpdateProvider,
highPriorityUpdate,
latestFlushedTime);
takeReadLock();
try {
timeSeriesSchemaCache.updateLastCache(
database,
devicePath,
measurements,
measurementSchemas,
isAligned,
timeValuePairProvider,
shouldUpdateProvider,
highPriorityUpdate,
latestFlushedTime);
} finally {
releaseReadLock();
}
}

/**
Expand All @@ -233,8 +241,13 @@ public void updateLastCache(
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime) {
timeSeriesSchemaCache.updateLastCache(
storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime);
takeReadLock();
try {
timeSeriesSchemaCache.updateLastCache(
storageGroup, measurementPath, timeValuePair, highPriorityUpdate, latestFlushedTime);
} finally {
releaseReadLock();
}
}

public void invalidateAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.ILastCacheContainer;
import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.lastcache.LastCacheContainer;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;

Expand Down Expand Up @@ -76,14 +77,25 @@ public boolean isAligned() {
}

public ILastCacheContainer getLastCacheContainer() {
return lastCacheContainer;
}

public int updateLastCache(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {

if (lastCacheContainer == null) {
synchronized (this) {
if (lastCacheContainer == null) {
lastCacheContainer = new LastCacheContainer();
ILastCacheContainer tmp = new LastCacheContainer();
int changeSize = tmp.estimateSize();
changeSize += tmp.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
lastCacheContainer = tmp;
return changeSize;
}
}
}
return lastCacheContainer;
return lastCacheContainer.updateCachedLast(
timeValuePair, highPriorityUpdate, latestFlushedTime);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.view.LogicalViewSchema;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.view.InsertNonWritableViewException;
Expand All @@ -46,7 +47,8 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.IntPredicate;

public class TimeSeriesSchemaCache {

Expand Down Expand Up @@ -222,12 +224,6 @@ public void computeValue(int index, SchemaCacheEntry value) {
return new Pair<>(indexOfMissingMeasurements, missedPathStringList);
}

public void put(ClusterSchemaTree schemaTree) {
for (MeasurementPath measurementPath : schemaTree.getAllMeasurement()) {
putSingleMeasurementPath(schemaTree.getBelongedDatabase(measurementPath), measurementPath);
}
}

public void putSingleMeasurementPath(String storageGroup, MeasurementPath measurementPath) {
SchemaCacheEntry schemaCacheEntry =
new SchemaCacheEntry(
Expand All @@ -250,6 +246,7 @@ public TimeValuePair getLastCache(PartialPath seriesPath) {
}

/** get SchemaCacheEntry and update last cache */
@TestOnly
public void updateLastCache(
PartialPath devicePath,
String measurement,
Expand All @@ -272,8 +269,8 @@ public void updateLastCache(
String[] measurements,
MeasurementSchema[] measurementSchemas,
boolean isAligned,
Function<Integer, TimeValuePair> timeValuePairProvider,
Function<Integer, Boolean> shouldUpdateProvider,
IntFunction<TimeValuePair> timeValuePairProvider,
IntPredicate shouldUpdateProvider,
boolean highPriorityUpdate,
Long latestFlushedTime) {
SchemaCacheEntry entry;
Expand All @@ -292,7 +289,7 @@ public String[] getSecondKeyList() {

@Override
public int updateValue(int index, SchemaCacheEntry value) {
if (!shouldUpdateProvider.apply(index)) {
if (!shouldUpdateProvider.test(index)) {
return 0;
}
if (value == null) {
Expand All @@ -316,10 +313,28 @@ public int updateValue(int index, SchemaCacheEntry value) {
}
}
}

DataNodeLastCacheManager.updateLastCache(
entry, timeValuePairProvider.apply(index), highPriorityUpdate, latestFlushedTime);
}
dualKeyCache.update(
new IDualKeyCacheUpdating<PartialPath, String, SchemaCacheEntry>() {
@Override
public PartialPath getFirstKey() {
return devicePath;
}

@Override
public String[] getSecondKeyList() {
return missingMeasurements.stream().map(i -> measurements[i]).toArray(String[]::new);
}

@Override
public int updateValue(int index, SchemaCacheEntry value) {
return DataNodeLastCacheManager.updateLastCache(
value,
timeValuePairProvider.apply(missingMeasurements.get(index)),
highPriorityUpdate,
latestFlushedTime);
}
});
}

/**
Expand All @@ -342,16 +357,31 @@ public void updateLastCache(
entry =
new SchemaCacheEntry(
storageGroup,
(MeasurementSchema) measurementPath.getMeasurementSchema(),
measurementPath.getMeasurementSchema(),
measurementPath.getTagMap(),
measurementPath.isUnderAlignedEntity());
dualKeyCache.put(seriesPath.getDevicePath(), seriesPath.getMeasurement(), entry);
}
}
}
dualKeyCache.update(
new IDualKeyCacheUpdating<PartialPath, String, SchemaCacheEntry>() {
@Override
public PartialPath getFirstKey() {
return measurementPath.getDevicePath();
}

DataNodeLastCacheManager.updateLastCache(
entry, timeValuePair, highPriorityUpdate, latestFlushedTime);
@Override
public String[] getSecondKeyList() {
return new String[] {measurementPath.getMeasurement()};
}

@Override
public int updateValue(int index, SchemaCacheEntry value) {
return DataNodeLastCacheManager.updateLastCache(
value, timeValuePair, highPriorityUpdate, latestFlushedTime);
}
});
}

public void invalidateAll() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,22 @@ public void update(IDualKeyCacheUpdating<FK, SK, V> updating) {
if (cacheEntry == null) {
updating.updateValue(i, null);
} else {
int changeSize = updating.updateValue(i, cacheEntry.getValue());
cacheEntryManager.access(cacheEntry);
if (changeSize != 0) {
cacheStats.increaseMemoryUsage(changeSize);
if (cacheStats.isExceedMemoryCapacity()) {
executeCacheEviction(changeSize);
int changeSize = 0;
synchronized (cacheEntry) {
if (cacheEntry.getBelongedGroup() != null) {
// Only update the value when the cache entry is not evicted.
// If the cache entry is evicted, getBelongedGroup is null.
// Synchronized is to guarantee the cache entry is not evicted during the update.
changeSize = updating.updateValue(i, cacheEntry.getValue());
cacheEntryManager.access(cacheEntry);
if (changeSize != 0) {
cacheStats.increaseMemoryUsage(changeSize);
}
}
}
if (changeSize != 0 && cacheStats.isExceedMemoryCapacity()) {
executeCacheEviction(changeSize);
}
hitCount++;
}
}
Expand Down Expand Up @@ -194,31 +202,34 @@ private int evictOneCacheEntry() {
if (evictCacheEntry == null) {
return 0;
}
AtomicInteger evictedSize = new AtomicInteger(0);
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));
synchronized (evictCacheEntry) {
AtomicInteger evictedSize = new AtomicInteger(0);
evictedSize.getAndAdd(sizeComputer.computeValueSize(evictCacheEntry.getValue()));

ICacheEntryGroup<FK, SK, V, T> belongedGroup = evictCacheEntry.getBelongedGroup();
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));
ICacheEntryGroup<FK, SK, V, T> belongedGroup = evictCacheEntry.getBelongedGroup();
evictCacheEntry.setBelongedGroup(null);
belongedGroup.removeCacheEntry(evictCacheEntry.getSecondKey());
evictedSize.getAndAdd(sizeComputer.computeSecondKeySize(evictCacheEntry.getSecondKey()));

if (belongedGroup.isEmpty()) {
firstKeyMap.compute(
belongedGroup.getFirstKey(),
(firstKey, cacheEntryGroup) -> {
if (cacheEntryGroup == null) {
// has been removed by other threads
return null;
}
if (cacheEntryGroup.isEmpty()) {
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
return null;
}
if (belongedGroup.isEmpty()) {
firstKeyMap.compute(
belongedGroup.getFirstKey(),
(firstKey, cacheEntryGroup) -> {
if (cacheEntryGroup == null) {
// has been removed by other threads
return null;
}
if (cacheEntryGroup.isEmpty()) {
evictedSize.getAndAdd(sizeComputer.computeFirstKeySize(firstKey));
return null;
}

// some other thread has put value to it
return cacheEntryGroup;
});
// some other thread has put value to it
return cacheEntryGroup;
});
}
return evictedSize.get();
}
return evictedSize.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private int getNextIndex(AtomicInteger roundRobinIndex) {
static class FIFOCacheEntry<SK, V> implements ICacheEntry<SK, V> {

private final SK secondKey;
private final ICacheEntryGroup cacheEntryGroup;
private volatile ICacheEntryGroup cacheEntryGroup;

private V value;

Expand All @@ -132,6 +132,11 @@ public ICacheEntryGroup getBelongedGroup() {
return cacheEntryGroup;
}

@Override
public void setBelongedGroup(ICacheEntryGroup belongedGroup) {
this.cacheEntryGroup = belongedGroup;
}

@Override
public void replaceValue(V newValue) {
this.value = newValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,7 @@ interface ICacheEntry<SK, V> {

ICacheEntryGroup getBelongedGroup();

void setBelongedGroup(ICacheEntryGroup belongedGroup);

void replaceValue(V newValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private LRULinkedList getBelongedList(LRUCacheEntry<SK, V> cacheEntry) {
static class LRUCacheEntry<SK, V> implements ICacheEntry<SK, V> {

private final SK secondKey;
private final ICacheEntryGroup cacheEntryGroup;
private volatile ICacheEntryGroup cacheEntryGroup;

private V value;

Expand Down Expand Up @@ -131,6 +131,11 @@ public ICacheEntryGroup getBelongedGroup() {
return cacheEntryGroup;
}

@Override
public void setBelongedGroup(ICacheEntryGroup belongedGroup) {
this.cacheEntryGroup = belongedGroup;
}

@Override
public void replaceValue(V newValue) {
this.value = newValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static TimeValuePair getLastCache(SchemaCacheEntry entry) {
return null;
}
ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
return lastCacheContainer.getCachedLast();
return lastCacheContainer == null ? null : lastCacheContainer.getCachedLast();
}

/**
Expand All @@ -63,8 +63,6 @@ public static int updateLastCache(
if (!CACHE_ENABLED || null == entry) {
return 0;
}
ILastCacheContainer lastCacheContainer = entry.getLastCacheContainer();
return lastCacheContainer.updateCachedLast(
timeValuePair, highPriorityUpdate, latestFlushedTime);
return entry.updateLastCache(timeValuePair, highPriorityUpdate, latestFlushedTime);
}
}
Loading