Skip to content

Commit

Permalink
[IOTDB-1543] LastCache for Template and Vector (#3796)
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcosZyk committed Sep 12, 2021
1 parent a885ec9 commit 3cb08e0
Show file tree
Hide file tree
Showing 19 changed files with 1,093 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iotdb.db.metadata.MManager;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.VectorPartialPath;
import org.apache.iotdb.db.metadata.lastCache.LastCacheManager;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.metadata.mnode.InternalMNode;
Expand Down Expand Up @@ -407,26 +408,26 @@ public void updateLastCache(
PartialPath seriesPath,
TimeValuePair timeValuePair,
boolean highPriorityUpdate,
Long latestFlushedTime,
IMeasurementMNode node) {
Long latestFlushedTime) {
cacheLock.writeLock().lock();
try {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
measurementMNode.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime);
LastCacheManager.updateLastCache(
seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, measurementMNode);
}
} finally {
cacheLock.writeLock().unlock();
}
// maybe local also has the timeseries
super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node);
super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime);
}

@Override
public TimeValuePair getLastCache(PartialPath seriesPath) {
IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath);
if (measurementMNode != null) {
return measurementMNode.getCachedLast();
return LastCacheManager.getLastCache(seriesPath, measurementMNode);
}

return super.getLastCache(seriesPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.apache.iotdb.db.exception.query.OutOfTTLException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.metadata.PartialPath;
import org.apache.iotdb.db.metadata.mnode.IMNode;
import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
import org.apache.iotdb.db.qp.physical.crud.DeletePlan;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
Expand All @@ -75,7 +74,6 @@
import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.read.TimeValuePair;
import org.apache.iotdb.tsfile.read.filter.basic.Filter;
import org.apache.iotdb.tsfile.utils.Pair;
import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
Expand Down Expand Up @@ -1090,32 +1088,34 @@ private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestF
return;
}
IMeasurementMNode[] mNodes = plan.getMeasurementMNodes();
int columnIndex = 0;
for (int i = 0; i < mNodes.length; i++) {
// Don't update cached last value for vector type
if (mNodes[i] != null && plan.isAligned()) {
columnIndex += mNodes[i].getSchema().getValueMeasurementIdList().size();
if (plan.getColumns()[i] == null) {
continue;
}
// Update cached last value with high priority
if (mNodes[i] == null) {
// no matter aligned or not, concat the path to use the full path to update LastCache
IoTDB.metaManager.updateLastCache(
plan.getPrefixPath().concatNode(plan.getMeasurements()[i]),
plan.composeLastTimeValuePair(i),
true,
latestFlushedTime);
} else {
if (plan.getColumns()[i] == null) {
columnIndex++;
continue;
}
// Update cached last value with high priority
if (mNodes[i] != null) {
// in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
// update last cache
if (plan.isAligned()) {
// vector lastCache update need subMeasurement
IoTDB.metaManager.updateLastCache(
null, plan.composeLastTimeValuePair(columnIndex), true, latestFlushedTime, mNodes[i]);
mNodes[i],
plan.getMeasurements()[i],
plan.composeLastTimeValuePair(i),
true,
latestFlushedTime);

} else {
// measurementMNodes[i] is null, use the path to update remote cache
// in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
// update last cache
IoTDB.metaManager.updateLastCache(
plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]),
plan.composeLastTimeValuePair(columnIndex),
true,
latestFlushedTime,
null);
mNodes[i], plan.composeLastTimeValuePair(i), true, latestFlushedTime);
}
columnIndex++;
}
}
}
Expand Down Expand Up @@ -1157,29 +1157,33 @@ private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTi
return;
}
IMeasurementMNode[] mNodes = plan.getMeasurementMNodes();
int columnIndex = 0;
for (IMeasurementMNode mNode : mNodes) {
// Don't update cached last value for vector type
if (!plan.isAligned()) {
if (plan.getValues()[columnIndex] == null) {
columnIndex++;
continue;
}
// Update cached last value with high priority
if (mNode != null) {
// in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
// update last cache
for (int i = 0; i < mNodes.length; i++) {
if (plan.getValues()[i] == null) {
continue;
}
// Update cached last value with high priority
if (mNodes[i] == null) {
// no matter aligned or not, concat the path to use the full path to update LastCache
IoTDB.metaManager.updateLastCache(
plan.getPrefixPath().concatNode(plan.getMeasurements()[i]),
plan.composeTimeValuePair(i),
true,
latestFlushedTime);
} else {
if (plan.isAligned()) {
// vector lastCache update need subSensor path
IoTDB.metaManager.updateLastCache(
null, plan.composeTimeValuePair(columnIndex), true, latestFlushedTime, mNode);
mNodes[i],
plan.getMeasurements()[i],
plan.composeTimeValuePair(i),
true,
latestFlushedTime);
} else {
// in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to
// update last cache
IoTDB.metaManager.updateLastCache(
plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]),
plan.composeTimeValuePair(columnIndex),
true,
latestFlushedTime,
null);
mNodes[i], plan.composeTimeValuePair(i), true, latestFlushedTime);
}
columnIndex++;
}
}
}
Expand Down Expand Up @@ -2045,24 +2049,7 @@ private void tryToDeleteLastCache(
return;
}
try {
IMNode node = IoTDB.metaManager.getDeviceNode(deviceId);

for (IMNode measurementNode : node.getChildren().values()) {
if (measurementNode != null
&& originalPath.matchFullPath(measurementNode.getPartialPath())) {
TimeValuePair lastPair = ((IMeasurementMNode) measurementNode).getCachedLast();
if (lastPair != null
&& startTime <= lastPair.getTimestamp()
&& lastPair.getTimestamp() <= endTime) {
((IMeasurementMNode) measurementNode).resetCache();
if (logger.isDebugEnabled()) {
logger.debug(
"[tryToDeleteLastCache] Last cache for path: {} is set to null",
measurementNode.getFullPath());
}
}
}
}
IoTDB.metaManager.deleteLastCacheByDevice(deviceId, originalPath, startTime, endTime);
} catch (MetadataException e) {
throw new WriteProcessException(e);
}
Expand Down Expand Up @@ -2395,16 +2382,7 @@ private void tryToDeleteLastCacheByDevice(PartialPath deviceId) {
return;
}
try {
IMNode node = IoTDB.metaManager.getDeviceNode(deviceId);

for (IMNode measurementNode : node.getChildren().values()) {
if (measurementNode != null) {
((IMeasurementMNode) measurementNode).resetCache();
logger.debug(
"[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null",
measurementNode.getFullPath());
}
}
IoTDB.metaManager.deleteLastCacheByDevice(deviceId);
} catch (MetadataException e) {
// the path doesn't cache in cluster mode now, ignore
}
Expand Down
Loading

0 comments on commit 3cb08e0

Please sign in to comment.