From 3cb08e0243269e4d5980942397ad2cccede081ce Mon Sep 17 00:00:00 2001 From: zyk990424 <38524330+zyk990424@users.noreply.github.com> Date: Sun, 12 Sep 2021 12:32:10 +0800 Subject: [PATCH] [IOTDB-1543] LastCache for Template and Vector (#3796) --- .../iotdb/cluster/metadata/CMManager.java | 11 +- .../storagegroup/StorageGroupProcessor.java | 116 +++--- .../apache/iotdb/db/metadata/MManager.java | 195 ++++++++++- .../org/apache/iotdb/db/metadata/MTree.java | 46 +-- .../metadata/lastCache/LastCacheManager.java | 331 ++++++++++++++++++ .../container/ILastCacheContainer.java | 58 +++ .../container/LastCacheContainer.java | 118 +++++++ .../container/value/ILastCacheValue.java | 47 +++ .../container/value/UnaryLastCacheValue.java | 106 ++++++ .../container/value/VectorLastCacheValue.java | 86 +++++ .../iotdb/db/metadata/mnode/EntityMNode.java | 25 ++ .../iotdb/db/metadata/mnode/IEntityMNode.java | 6 + .../db/metadata/mnode/IMeasurementMNode.java | 9 +- .../db/metadata/mnode/MeasurementMNode.java | 45 +-- .../iotdb/db/metadata/tag/TagManager.java | 5 +- .../db/query/executor/LastQueryExecutor.java | 40 ++- .../iotdb/db/engine/storagegroup/TTLTest.java | 4 +- .../iotdb/db/integration/IoTDBLastIT.java | 28 +- .../db/metadata/MManagerAdvancedTest.java | 16 +- 19 files changed, 1093 insertions(+), 199 deletions(-) create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java create mode 100644 server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java diff --git a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java index fa06e1299381..75ae3796c8d6 100644 --- a/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java +++ b/cluster/src/main/java/org/apache/iotdb/cluster/metadata/CMManager.java @@ -43,6 +43,7 @@ import org.apache.iotdb.db.metadata.MManager; import org.apache.iotdb.db.metadata.PartialPath; import org.apache.iotdb.db.metadata.VectorPartialPath; +import org.apache.iotdb.db.metadata.lastCache.LastCacheManager; import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.mnode.InternalMNode; @@ -407,26 +408,26 @@ public void updateLastCache( PartialPath seriesPath, TimeValuePair timeValuePair, boolean highPriorityUpdate, - Long latestFlushedTime, - IMeasurementMNode node) { + Long latestFlushedTime) { cacheLock.writeLock().lock(); try { IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath); if (measurementMNode != null) { - measurementMNode.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); + LastCacheManager.updateLastCache( + seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, measurementMNode); } } finally { cacheLock.writeLock().unlock(); } // maybe local also has the timeseries - super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node); + super.updateLastCache(seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime); } @Override public TimeValuePair getLastCache(PartialPath seriesPath) { IMeasurementMNode measurementMNode = mRemoteMetaCache.get(seriesPath); if (measurementMNode != null) { - return measurementMNode.getCachedLast(); + return LastCacheManager.getLastCache(seriesPath, measurementMNode); } return super.getLastCache(seriesPath); diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java index a5b3933b8c6e..9ded4e6645ca 100755 --- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java @@ -55,7 +55,6 @@ import org.apache.iotdb.db.exception.query.OutOfTTLException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.qp.physical.crud.DeletePlan; import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan; @@ -75,7 +74,6 @@ import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata; import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer; import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.read.filter.basic.Filter; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; @@ -1090,32 +1088,34 @@ private void tryToUpdateBatchInsertLastCache(InsertTabletPlan plan, Long latestF return; } IMeasurementMNode[] mNodes = plan.getMeasurementMNodes(); - int columnIndex = 0; for (int i = 0; i < mNodes.length; i++) { - // Don't update cached last value for vector type - if (mNodes[i] != null && plan.isAligned()) { - columnIndex += mNodes[i].getSchema().getValueMeasurementIdList().size(); + if (plan.getColumns()[i] == null) { + continue; + } + // Update cached last value with high priority + if (mNodes[i] == null) { + // no matter aligned or not, concat the path to use the full path to update LastCache + IoTDB.metaManager.updateLastCache( + plan.getPrefixPath().concatNode(plan.getMeasurements()[i]), + plan.composeLastTimeValuePair(i), + true, + latestFlushedTime); } else { - if (plan.getColumns()[i] == null) { - columnIndex++; - continue; - } - // Update cached last value with high priority - if (mNodes[i] != null) { - // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to - // update last cache + if (plan.isAligned()) { + // vector lastCache update need subMeasurement IoTDB.metaManager.updateLastCache( - null, plan.composeLastTimeValuePair(columnIndex), true, latestFlushedTime, mNodes[i]); + mNodes[i], + plan.getMeasurements()[i], + plan.composeLastTimeValuePair(i), + true, + latestFlushedTime); + } else { - // measurementMNodes[i] is null, use the path to update remote cache + // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to + // update last cache IoTDB.metaManager.updateLastCache( - plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]), - plan.composeLastTimeValuePair(columnIndex), - true, - latestFlushedTime, - null); + mNodes[i], plan.composeLastTimeValuePair(i), true, latestFlushedTime); } - columnIndex++; } } } @@ -1157,29 +1157,33 @@ private void tryToUpdateInsertLastCache(InsertRowPlan plan, Long latestFlushedTi return; } IMeasurementMNode[] mNodes = plan.getMeasurementMNodes(); - int columnIndex = 0; - for (IMeasurementMNode mNode : mNodes) { - // Don't update cached last value for vector type - if (!plan.isAligned()) { - if (plan.getValues()[columnIndex] == null) { - columnIndex++; - continue; - } - // Update cached last value with high priority - if (mNode != null) { - // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to - // update last cache + for (int i = 0; i < mNodes.length; i++) { + if (plan.getValues()[i] == null) { + continue; + } + // Update cached last value with high priority + if (mNodes[i] == null) { + // no matter aligned or not, concat the path to use the full path to update LastCache + IoTDB.metaManager.updateLastCache( + plan.getPrefixPath().concatNode(plan.getMeasurements()[i]), + plan.composeTimeValuePair(i), + true, + latestFlushedTime); + } else { + if (plan.isAligned()) { + // vector lastCache update need subSensor path IoTDB.metaManager.updateLastCache( - null, plan.composeTimeValuePair(columnIndex), true, latestFlushedTime, mNode); + mNodes[i], + plan.getMeasurements()[i], + plan.composeTimeValuePair(i), + true, + latestFlushedTime); } else { + // in stand alone version, the seriesPath is not needed, just use measurementMNodes[i] to + // update last cache IoTDB.metaManager.updateLastCache( - plan.getPrefixPath().concatNode(plan.getMeasurements()[columnIndex]), - plan.composeTimeValuePair(columnIndex), - true, - latestFlushedTime, - null); + mNodes[i], plan.composeTimeValuePair(i), true, latestFlushedTime); } - columnIndex++; } } } @@ -2045,24 +2049,7 @@ private void tryToDeleteLastCache( return; } try { - IMNode node = IoTDB.metaManager.getDeviceNode(deviceId); - - for (IMNode measurementNode : node.getChildren().values()) { - if (measurementNode != null - && originalPath.matchFullPath(measurementNode.getPartialPath())) { - TimeValuePair lastPair = ((IMeasurementMNode) measurementNode).getCachedLast(); - if (lastPair != null - && startTime <= lastPair.getTimestamp() - && lastPair.getTimestamp() <= endTime) { - ((IMeasurementMNode) measurementNode).resetCache(); - if (logger.isDebugEnabled()) { - logger.debug( - "[tryToDeleteLastCache] Last cache for path: {} is set to null", - measurementNode.getFullPath()); - } - } - } - } + IoTDB.metaManager.deleteLastCacheByDevice(deviceId, originalPath, startTime, endTime); } catch (MetadataException e) { throw new WriteProcessException(e); } @@ -2395,16 +2382,7 @@ private void tryToDeleteLastCacheByDevice(PartialPath deviceId) { return; } try { - IMNode node = IoTDB.metaManager.getDeviceNode(deviceId); - - for (IMNode measurementNode : node.getChildren().values()) { - if (measurementNode != null) { - ((IMeasurementMNode) measurementNode).resetCache(); - logger.debug( - "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null", - measurementNode.getFullPath()); - } - } + IoTDB.metaManager.deleteLastCacheByDevice(deviceId); } catch (MetadataException e) { // the path doesn't cache in cluster mode now, ignore } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java index 9368523c213a..5ea0477047a4 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MManager.java @@ -24,6 +24,7 @@ import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; import org.apache.iotdb.db.engine.trigger.executor.TriggerEngine; import org.apache.iotdb.db.exception.metadata.*; +import org.apache.iotdb.db.metadata.lastCache.LastCacheManager; import org.apache.iotdb.db.metadata.logfile.MLogReader; import org.apache.iotdb.db.metadata.logfile.MLogWriter; import org.apache.iotdb.db.metadata.mnode.*; @@ -1661,32 +1662,198 @@ public void cacheMeta( // do nothing } + /** + * Update the last cache value of time series of given seriesPath. + * + *

MManager will use the seriesPath to search the node first and then process the lastCache in + * the MeasurementMNode + * + *

Invoking scenario: (1) after executing insertPlan (2) after reading last value from file + * during last Query + * + * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries + * @param timeValuePair the latest point value + * @param highPriorityUpdate the last value from insertPlan is high priority + * @param latestFlushedTime latest flushed time + */ public void updateLastCache( PartialPath seriesPath, TimeValuePair timeValuePair, boolean highPriorityUpdate, - Long latestFlushedTime, - IMeasurementMNode node) { - if (node != null) { - node.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); - } else { - try { - IMeasurementMNode node1 = (IMeasurementMNode) mtree.getNodeByPath(seriesPath); - node1.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); - } catch (MetadataException e) { - logger.warn("failed to update last cache for the {}, err:{}", seriesPath, e.getMessage()); - } + Long latestFlushedTime) { + IMeasurementMNode node; + try { + node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath); + } catch (MetadataException e) { + logger.warn("failed to update last cache for the {}, err:{}", seriesPath, e.getMessage()); + return; } + + LastCacheManager.updateLastCache( + seriesPath, timeValuePair, highPriorityUpdate, latestFlushedTime, node); } + /** + * Update the last cache value in given unary MeasurementMNode. Vector lastCache operation won't + * work. + * + *

Invoking scenario: (1) after executing insertPlan (2) after reading last value from file + * during last Query + * + * @param node the measurementMNode holding the lastCache, must be unary measurement + * @param timeValuePair the latest point value + * @param highPriorityUpdate the last value from insertPlan is high priority + * @param latestFlushedTime latest flushed time + */ + public void updateLastCache( + IMeasurementMNode node, + TimeValuePair timeValuePair, + boolean highPriorityUpdate, + Long latestFlushedTime) { + if (node.getSchema() instanceof VectorMeasurementSchema) { + throw new UnsupportedOperationException("Must provide subMeasurement for vector measurement"); + } + LastCacheManager.updateLastCache( + node.getPartialPath(), timeValuePair, highPriorityUpdate, latestFlushedTime, node); + } + + /** + * Update the last cache value of subMeasurement given Vector MeasurementMNode. + * + *

Invoking scenario: (1) after executing insertPlan (2) after reading last value from file + * during last Query + * + * @param node the measurementMNode holding the lastCache + * @param subMeasurement the subMeasurement of aligned timeseries + * @param timeValuePair the latest point value + * @param highPriorityUpdate the last value from insertPlan is high priority + * @param latestFlushedTime latest flushed time + */ + public void updateLastCache( + IMeasurementMNode node, + String subMeasurement, + TimeValuePair timeValuePair, + boolean highPriorityUpdate, + Long latestFlushedTime) { + if (!(node.getSchema() instanceof VectorMeasurementSchema)) { + throw new UnsupportedOperationException( + "Can't update lastCache of subMeasurement in unary measurement"); + } + LastCacheManager.updateLastCache( + node.getPartialPath().concatNode(subMeasurement), + timeValuePair, + highPriorityUpdate, + latestFlushedTime, + node); + } + + /** + * Get the last cache value of time series of given seriesPath. MManager will use the seriesPath + * to search the node. + * + *

Invoking scenario: last cache read during last Query + * + * @param seriesPath the full path from root to measurement of timeseries or subMeasurement of + * aligned timeseries + * @return the last cache value + */ public TimeValuePair getLastCache(PartialPath seriesPath) { + IMeasurementMNode node; try { - IMeasurementMNode node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath); - return node.getCachedLast(); + node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath); } catch (MetadataException e) { logger.warn("failed to get last cache for the {}, err:{}", seriesPath, e.getMessage()); + return null; + } + + return LastCacheManager.getLastCache(seriesPath, node); + } + + /** + * Get the last cache value in given unary MeasurementMNode. Vector case won't work. + * + *

Invoking scenario: last cache read during last Query + * + * @param node the measurementMNode holding the lastCache, must be unary measurement + * @return the last cache value + */ + public TimeValuePair getLastCache(IMeasurementMNode node) { + if (node.getSchema() instanceof VectorMeasurementSchema) { + throw new UnsupportedOperationException("Must provide subMeasurement for vector measurement"); + } + return LastCacheManager.getLastCache(node.getPartialPath(), node); + } + + /** + * Get the last cache value of given subMeasurement of given MeasurementMNode. Must be Vector + * case. + * + *

Invoking scenario: last cache read during last Query + * + * @param node the measurementMNode holding the lastCache + * @param subMeasurement the subMeasurement of aligned timeseries + * @return the last cache value + */ + public TimeValuePair getLastCache(IMeasurementMNode node, String subMeasurement) { + if (!(node.getSchema() instanceof VectorMeasurementSchema)) { + throw new UnsupportedOperationException( + "Can't get lastCache of subMeasurement from unary measurement"); + } + return LastCacheManager.getLastCache(node.getPartialPath().concatNode(subMeasurement), node); + } + + /** + * Reset the last cache value of time series of given seriesPath. MManager will use the seriesPath + * to search the node. + * + * @param seriesPath the path from root to measurement of timeseries or subMeasurement of aligned + * timeseries + */ + public void resetLastCache(PartialPath seriesPath) { + IMeasurementMNode node; + try { + node = (IMeasurementMNode) mtree.getNodeByPath(seriesPath); + } catch (MetadataException e) { + logger.warn("failed to reset last cache for the {}, err:{}", seriesPath, e.getMessage()); + return; + } + + LastCacheManager.resetLastCache(seriesPath, node); + } + + /** + * delete all the last cache value of any timeseries or aligned timeseries under the device + * + *

Invoking scenario (1) after upload tsfile + * + * @param deviceId path of device + */ + public void deleteLastCacheByDevice(PartialPath deviceId) throws MetadataException { + IMNode node = getDeviceNode(deviceId); + if (node.isEntity()) { + LastCacheManager.deleteLastCacheByDevice((IEntityMNode) node); + } + } + + /** + * delete the last cache value of timeseries or subMeasurement of some aligned timeseries, which + * is under the device and matching the originalPath + * + *

Invoking scenario (1) delete timeseries + * + * @param deviceId path of device + * @param originalPath origin timeseries path + * @param startTime startTime + * @param endTime endTime + */ + public void deleteLastCacheByDevice( + PartialPath deviceId, PartialPath originalPath, long startTime, long endTime) + throws MetadataException { + IMNode node = IoTDB.metaManager.getDeviceNode(deviceId); + if (node.isEntity()) { + LastCacheManager.deleteLastCacheByDevice( + (IEntityMNode) node, originalPath, startTime, endTime); } - return null; } /** get schema for device. Attention!!! Only support insertPlan */ diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java index 755605fd2ca5..e6d69f4e58e2 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/MTree.java @@ -21,7 +21,6 @@ import org.apache.iotdb.db.conf.IoTDBConstant; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory; -import org.apache.iotdb.db.engine.querycontext.QueryDataSource; import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException; import org.apache.iotdb.db.exception.metadata.IllegalPathException; import org.apache.iotdb.db.exception.metadata.MetadataException; @@ -30,6 +29,7 @@ import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException; import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException; import org.apache.iotdb.db.metadata.MManager.StorageGroupFilter; +import org.apache.iotdb.db.metadata.lastCache.LastCacheManager; import org.apache.iotdb.db.metadata.logfile.MLogReader; import org.apache.iotdb.db.metadata.logfile.MLogWriter; import org.apache.iotdb.db.metadata.mnode.IEntityMNode; @@ -49,14 +49,11 @@ import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; import org.apache.iotdb.db.qp.physical.sys.StorageGroupMNodePlan; import org.apache.iotdb.db.query.context.QueryContext; -import org.apache.iotdb.db.query.control.QueryResourceManager; import org.apache.iotdb.db.query.dataset.ShowDevicesResult; -import org.apache.iotdb.db.query.executor.fill.LastPointReader; import org.apache.iotdb.db.utils.TestOnly; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.utils.Pair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -176,38 +173,6 @@ public void createSnapshot() throws IOException { } } - public static long getLastTimeStamp(IMeasurementMNode node, QueryContext queryContext) { - TimeValuePair last = node.getCachedLast(); - if (last != null) { - return node.getCachedLast().getTimestamp(); - } else { - try { - QueryDataSource dataSource = - QueryResourceManager.getInstance() - .getQueryDataSource(node.getPartialPath(), queryContext, null); - Set measurementSet = new HashSet<>(); - measurementSet.add(node.getPartialPath().getFullPath()); - LastPointReader lastReader = - new LastPointReader( - node.getPartialPath(), - node.getSchema().getType(), - measurementSet, - queryContext, - dataSource, - Long.MAX_VALUE, - null); - last = lastReader.readLastPoint(); - return (last != null ? last.getTimestamp() : Long.MIN_VALUE); - } catch (Exception e) { - logger.error( - "Something wrong happened while trying to get last time value pair of {}", - node.getFullPath(), - e); - return Long.MIN_VALUE; - } - } - } - private static String jsonToString(JsonObject jsonObject) { return GSON.toJson(jsonObject); } @@ -1395,7 +1360,8 @@ private void addMeasurementSchema( tsRow[5] = String.valueOf(((IMeasurementMNode) node).getOffset()); tsRow[6] = needLast - ? String.valueOf(getLastTimeStamp((IMeasurementMNode) node, queryContext)) + ? String.valueOf( + LastCacheManager.getLastTimeStamp((IMeasurementMNode) node, queryContext)) : null; Pair temp = new Pair<>(nodePath, tsRow); timeseriesSchemaList.add(temp); @@ -1425,7 +1391,8 @@ private void addVectorMeasurementSchema( tsRow[5] = "-1"; tsRow[6] = needLast - ? String.valueOf(getLastTimeStamp((IMeasurementMNode) node, queryContext)) + ? String.valueOf( + LastCacheManager.getLastTimeStamp((IMeasurementMNode) node, queryContext)) : null; Pair temp = new Pair<>(new PartialPath(devicePath.getFullPath(), measurements.get(i)), tsRow); @@ -1458,7 +1425,8 @@ private void addVectorMeasurementSchemaForTemplate( tsRow[5] = "-1"; tsRow[6] = needLast - ? String.valueOf(getLastTimeStamp((IMeasurementMNode) node, queryContext)) + ? String.valueOf( + LastCacheManager.getLastTimeStamp((IMeasurementMNode) node, queryContext)) : null; Pair temp = new Pair<>(new PartialPath(devicePath.getFullPath(), measurements.get(i)), tsRow); diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java new file mode 100644 index 000000000000..031b1408306f --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/LastCacheManager.java @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.lastCache; + +import org.apache.iotdb.db.engine.querycontext.QueryDataSource; +import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; +import org.apache.iotdb.db.metadata.mnode.IEntityMNode; +import org.apache.iotdb.db.metadata.mnode.IMNode; +import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; +import org.apache.iotdb.db.metadata.template.Template; +import org.apache.iotdb.db.query.context.QueryContext; +import org.apache.iotdb.db.query.control.QueryResourceManager; +import org.apache.iotdb.db.query.executor.fill.LastPointReader; +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; +import org.apache.iotdb.tsfile.write.schema.VectorMeasurementSchema; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +// this class provides all the operations on last cache +public class LastCacheManager { + + private static final Logger logger = LoggerFactory.getLogger(LastCacheManager.class); + + /** + * get the last cache value of time series of given seriesPath + * + * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries + * @param node the measurementMNode holding the lastCache When invoker only has the target + * seriesPath, the node could be null and MManager will search the node + * @return the last cache value + */ + public static TimeValuePair getLastCache(PartialPath seriesPath, IMeasurementMNode node) { + if (node == null) { + return null; + } + + checkIsTemplateLastCacheAndSetIfAbsent(node); + + ILastCacheContainer lastCacheContainer = node.getLastCacheContainer(); + if (seriesPath == null) { + return lastCacheContainer.getCachedLast(); + } else { + String measurementId = seriesPath.getMeasurement(); + if (measurementId.equals(node.getName()) || measurementId.equals(node.getAlias())) { + return lastCacheContainer.getCachedLast(); + } else { + IMeasurementSchema schema = node.getSchema(); + if (schema instanceof VectorMeasurementSchema) { + return lastCacheContainer.getCachedLast( + schema.getMeasurementIdColumnIndex(seriesPath.getMeasurement())); + } + return null; + } + } + } + + /** + * update the last cache value of time series of given seriesPath + * + * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries + * @param timeValuePair the latest point value + * @param highPriorityUpdate the last value from insertPlan is high priority + * @param latestFlushedTime latest flushed time + * @param node the measurementMNode holding the lastCache When invoker only has the target + * seriesPath, the node could be null and MManager will search the node + */ + public static void updateLastCache( + PartialPath seriesPath, + TimeValuePair timeValuePair, + boolean highPriorityUpdate, + Long latestFlushedTime, + IMeasurementMNode node) { + if (node == null) { + return; + } + + checkIsTemplateLastCacheAndSetIfAbsent(node); + + ILastCacheContainer lastCacheContainer = node.getLastCacheContainer(); + if (seriesPath == null) { + lastCacheContainer.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); + } else { + String measurementId = seriesPath.getMeasurement(); + if (measurementId.equals(node.getName()) || measurementId.equals(node.getAlias())) { + lastCacheContainer.updateCachedLast(timeValuePair, highPriorityUpdate, latestFlushedTime); + } else { + IMeasurementSchema schema = node.getSchema(); + if (schema instanceof VectorMeasurementSchema) { + if (lastCacheContainer.isEmpty()) { + lastCacheContainer.init(schema.getMeasurementCount()); + } + lastCacheContainer.updateCachedLast( + schema.getMeasurementIdColumnIndex(seriesPath.getMeasurement()), + timeValuePair, + highPriorityUpdate, + latestFlushedTime); + } + } + } + } + + /** + * reset the last cache value of time series of given seriesPath + * + * @param seriesPath the path of timeseries or subMeasurement of aligned timeseries + * @param node the measurementMNode holding the lastCache When invoker only has the target + * seriesPath, the node could be null and MManager will search the node + */ + public static void resetLastCache(PartialPath seriesPath, IMeasurementMNode node) { + if (node == null) { + return; + } + + checkIsTemplateLastCacheAndSetIfAbsent(node); + + ILastCacheContainer lastCacheContainer = node.getLastCacheContainer(); + if (seriesPath == null) { + lastCacheContainer.resetLastCache(); + } else { + String measurementId = seriesPath.getMeasurement(); + if (measurementId.equals(node.getName()) || measurementId.equals(node.getAlias())) { + lastCacheContainer.resetLastCache(); + } else { + IMeasurementSchema schema = node.getSchema(); + if (schema instanceof VectorMeasurementSchema) { + if (lastCacheContainer.isEmpty()) { + lastCacheContainer.init(schema.getMeasurementCount()); + } + lastCacheContainer.resetLastCache( + schema.getMeasurementIdColumnIndex(seriesPath.getMeasurement())); + } + } + } + } + + private static void checkIsTemplateLastCacheAndSetIfAbsent(IMeasurementMNode node) { + IEntityMNode entityMNode = node.getParent(); + if (entityMNode == null) { + // cluster cached remote measurementMNode doesn't have parent + return; + } + String measurement = node.getName(); + + // if entityMNode doesn't have this child, the child is derived from template + if (!entityMNode.hasChild(measurement)) { + ILastCacheContainer lastCacheContainer = entityMNode.getLastCacheContainer(measurement); + IMeasurementSchema schema = node.getSchema(); + if (lastCacheContainer.isEmpty() && (schema instanceof VectorMeasurementSchema)) { + lastCacheContainer.init(schema.getMeasurementCount()); + } + node.setLastCacheContainer(lastCacheContainer); + } + } + + /** + * delete all the last cache value of any timeseries or aligned timeseries under the entity + * + * @param node entity node + */ + public static void deleteLastCacheByDevice(IEntityMNode node) { + // process lastCache of timeseries represented by measurementNode + for (IMNode measurementNode : node.getChildren().values()) { + if (measurementNode != null) { + ((IMeasurementMNode) measurementNode).getLastCacheContainer().resetLastCache(); + if (logger.isDebugEnabled()) { + logger.debug( + "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null", + measurementNode.getFullPath()); + } + } + } + // process lastCache of timeseries represented by template + for (Map.Entry entry : node.getTemplateLastCaches().entrySet()) { + entry.getValue().resetLastCache(); + if (logger.isDebugEnabled()) { + logger.debug( + "[tryToDeleteLastCacheByDevice] Last cache for path: {} is set to null", + node.getPartialPath().concatNode(entry.getKey()).getFullPath()); + } + } + } + + /** + * delete the last cache value of timeseries or subMeasurement of some aligned timeseries, which + * is under the entity and matching the originalPath + * + * @param node entity node + * @param originalPath origin timeseries path + * @param startTime startTime + * @param endTime endTime + */ + public static void deleteLastCacheByDevice( + IEntityMNode node, PartialPath originalPath, long startTime, long endTime) { + PartialPath path; + IMeasurementSchema schema; + ILastCacheContainer lastCacheContainer; + + // process lastCache of timeseries represented by measurementNode + IMeasurementMNode measurementMNode; + for (IMNode child : node.getChildren().values()) { + if (child == null || !child.isMeasurement()) { + continue; + } + path = child.getPartialPath(); + measurementMNode = (IMeasurementMNode) child; + if (originalPath.matchFullPath(path)) { + lastCacheContainer = measurementMNode.getLastCacheContainer(); + if (lastCacheContainer == null) { + continue; + } + schema = measurementMNode.getSchema(); + deleteLastCache(path, schema, lastCacheContainer, startTime, endTime); + } + } + + // process lastCache of timeseries represented by template + Template template = node.getUpperTemplate(); + for (Map.Entry entry : node.getTemplateLastCaches().entrySet()) { + path = node.getPartialPath().concatNode(entry.getKey()); + if (originalPath.matchFullPath(path)) { + lastCacheContainer = entry.getValue(); + if (lastCacheContainer == null) { + continue; + } + schema = template.getSchemaMap().get(entry.getKey()); + deleteLastCache(path, schema, lastCacheContainer, startTime, endTime); + } + } + } + + private static void deleteLastCache( + PartialPath path, + IMeasurementSchema schema, + ILastCacheContainer lastCacheContainer, + long startTime, + long endTime) { + TimeValuePair lastPair; + if (schema instanceof VectorMeasurementSchema) { + int index; + for (String measurement : schema.getValueMeasurementIdList()) { + index = schema.getMeasurementIdColumnIndex(measurement); + lastPair = lastCacheContainer.getCachedLast(index); + if (lastPair != null + && startTime <= lastPair.getTimestamp() + && lastPair.getTimestamp() <= endTime) { + lastCacheContainer.resetLastCache(index); + if (logger.isDebugEnabled()) { + logger.debug( + "[tryToDeleteLastCache] Last cache for path: {} is set to null", + path.concatNode(measurement).getFullPath()); + } + } + } + } else { + lastPair = lastCacheContainer.getCachedLast(); + if (lastPair != null + && startTime <= lastPair.getTimestamp() + && lastPair.getTimestamp() <= endTime) { + lastCacheContainer.resetLastCache(); + if (logger.isDebugEnabled()) { + logger.debug( + "[tryToDeleteLastCache] Last cache for path: {} is set to null", path.getFullPath()); + } + } + } + } + + /** + * get the last value of timeseries represented by given measurementMNode get last value from + * cache in measurementMNode if absent, get last value from file + * + * @param node measurementMNode representing the target timeseries + * @param queryContext query context + * @return the last value + */ + public static long getLastTimeStamp(IMeasurementMNode node, QueryContext queryContext) { + TimeValuePair last = getLastCache(null, node); + if (last != null) { + return getLastCache(null, node).getTimestamp(); + } else { + try { + QueryDataSource dataSource = + QueryResourceManager.getInstance() + .getQueryDataSource(node.getPartialPath(), queryContext, null); + Set measurementSet = new HashSet<>(); + measurementSet.add(node.getPartialPath().getFullPath()); + LastPointReader lastReader = + new LastPointReader( + node.getPartialPath(), + node.getSchema().getType(), + measurementSet, + queryContext, + dataSource, + Long.MAX_VALUE, + null); + last = lastReader.readLastPoint(); + return (last != null ? last.getTimestamp() : Long.MIN_VALUE); + } catch (Exception e) { + logger.error( + "Something wrong happened while trying to get last time value pair of {}", + node.getFullPath(), + e); + return Long.MIN_VALUE; + } + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java new file mode 100644 index 000000000000..3767b51fd645 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/ILastCacheContainer.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.lastCache.container; + +import org.apache.iotdb.tsfile.read.TimeValuePair; + +/** this interface declares the operations of LastCache data */ +public interface ILastCacheContainer { + + // if vector, entry need schema size to init LastCache Value list + void init(int size); + + // get lastCache of monad timseries + TimeValuePair getCachedLast(); + + // get lastCache of vector timseries + TimeValuePair getCachedLast(int index); + + /** + * update last point cache + * + * @param timeValuePair last point + * @param highPriorityUpdate whether it's a high priority update + * @param latestFlushedTime latest flushed time + */ + void updateCachedLast( + TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime); + + // update lastCache for vector timseries + void updateCachedLast( + int index, TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime); + + // reset all lastCache data of one timeseries(monad or vector) + void resetLastCache(); + + // reset lastCache of vector's subsensor + void resetLastCache(int index); + + // whether the entry contains lastCache Value. + boolean isEmpty(); +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java new file mode 100644 index 000000000000..8ba9a9902c4a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/LastCacheContainer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.lastCache.container; + +import org.apache.iotdb.db.metadata.lastCache.container.value.ILastCacheValue; +import org.apache.iotdb.db.metadata.lastCache.container.value.UnaryLastCacheValue; +import org.apache.iotdb.db.metadata.lastCache.container.value.VectorLastCacheValue; +import org.apache.iotdb.tsfile.read.TimeValuePair; + +/** + * This class possesses the ILastCacheValue and implements the basic last cache operations. + * + *

The ILastCacheValue may be extended to ILastCacheValue List in future to support batched last + * value cache. + */ +public class LastCacheContainer implements ILastCacheContainer { + + ILastCacheValue lastCacheValue; + + @Override + public void init(int size) { + if (size > 1) { + lastCacheValue = new VectorLastCacheValue(size); + } + } + + @Override + public TimeValuePair getCachedLast() { + return lastCacheValue == null ? null : lastCacheValue.getTimeValuePair(); + } + + @Override + public TimeValuePair getCachedLast(int index) { + return lastCacheValue == null ? null : lastCacheValue.getTimeValuePair(index); + } + + @Override + public synchronized void updateCachedLast( + TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { + if (timeValuePair == null || timeValuePair.getValue() == null) { + return; + } + + if (lastCacheValue == null) { + // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will + // update cache. + if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) { + lastCacheValue = + new UnaryLastCacheValue(timeValuePair.getTimestamp(), timeValuePair.getValue()); + } + } else if (timeValuePair.getTimestamp() > lastCacheValue.getTimestamp() + || (timeValuePair.getTimestamp() == lastCacheValue.getTimestamp() && highPriorityUpdate)) { + lastCacheValue.setTimestamp(timeValuePair.getTimestamp()); + lastCacheValue.setValue(timeValuePair.getValue()); + } + } + + @Override + public synchronized void updateCachedLast( + int index, TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { + if (timeValuePair == null || timeValuePair.getValue() == null) { + return; + } + + if (lastCacheValue.getTimeValuePair(index) == null) { + // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will + // update cache. + if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) { + lastCacheValue.setTimestamp(index, timeValuePair.getTimestamp()); + lastCacheValue.setValue(index, timeValuePair.getValue()); + } + } else if (timeValuePair.getTimestamp() > lastCacheValue.getTimestamp(index)) { + lastCacheValue.setTimestamp(index, timeValuePair.getTimestamp()); + lastCacheValue.setValue(index, timeValuePair.getValue()); + } else if (timeValuePair.getTimestamp() == lastCacheValue.getTimestamp(index)) { + if (highPriorityUpdate || lastCacheValue.getValue(index) == null) { + lastCacheValue.setTimestamp(index, timeValuePair.getTimestamp()); + lastCacheValue.setValue(index, timeValuePair.getValue()); + } + } + } + + @Override + public synchronized void resetLastCache() { + lastCacheValue = null; + } + + @Override + public void resetLastCache(int index) { + if (lastCacheValue instanceof VectorLastCacheValue) { + lastCacheValue.setValue(index, null); + } else { + lastCacheValue = null; + } + } + + @Override + public boolean isEmpty() { + return lastCacheValue == null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java new file mode 100644 index 000000000000..e4fc71683ae5 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/ILastCacheValue.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.lastCache.container.value; + +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +// this interface declares the simplest storage operation of lastCacheValue +public interface ILastCacheValue { + + long getTimestamp(); + + void setTimestamp(long timestamp); + + void setValue(TsPrimitiveType value); + + TimeValuePair getTimeValuePair(); + + int getSize(); + + long getTimestamp(int index); + + void setTimestamp(int index, long timestamp); + + TsPrimitiveType getValue(int index); + + void setValue(int index, TsPrimitiveType value); + + TimeValuePair getTimeValuePair(int index); +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java new file mode 100644 index 000000000000..b8f21ce8875a --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/UnaryLastCacheValue.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.lastCache.container.value; + +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +public class UnaryLastCacheValue implements ILastCacheValue { + + private static final String INDEX_OPERATION_ON_MONAD_EXCEPTION = + "Cannot operate data on any index but 0 on MonadLastCacheValue"; + + private long timestamp; + + private TsPrimitiveType value; + + public UnaryLastCacheValue(long timestamp, TsPrimitiveType value) { + this.timestamp = timestamp; + this.value = value; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Override + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + @Override + public void setValue(TsPrimitiveType value) { + this.value = value; + } + + @Override + public TimeValuePair getTimeValuePair() { + return new TimeValuePair(timestamp, value); + } + + @Override + public int getSize() { + return 1; + } + + @Override + public long getTimestamp(int index) { + if (index == 0) { + return timestamp; + } + throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION); + } + + @Override + public void setTimestamp(int index, long timestamp) { + if (index == 0) { + this.timestamp = timestamp; + } + throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION); + } + + @Override + public TsPrimitiveType getValue(int index) { + if (index == 0) { + return value; + } + throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION); + } + + @Override + public void setValue(int index, TsPrimitiveType value) { + if (index == 0) { + this.value = value; + } + throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION); + } + + @Override + public TimeValuePair getTimeValuePair(int index) { + if (index != 0) { + throw new RuntimeException(INDEX_OPERATION_ON_MONAD_EXCEPTION); + } else if (value == null) { + return null; + } else { + return new TimeValuePair(timestamp, value); + } + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java new file mode 100644 index 000000000000..d5a3adad0031 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/metadata/lastCache/container/value/VectorLastCacheValue.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.metadata.lastCache.container.value; + +import org.apache.iotdb.tsfile.read.TimeValuePair; +import org.apache.iotdb.tsfile.utils.TsPrimitiveType; + +// this class defines the storage of vector lastCache data +public class VectorLastCacheValue implements ILastCacheValue { + + // the last point data of different subSensors may vary from each other on timestamp + private long[] timestamps; + + private TsPrimitiveType[] values; + + public VectorLastCacheValue(int size) { + timestamps = new long[size]; + values = new TsPrimitiveType[size]; + } + + @Override + public int getSize() { + return values.length; + } + + @Override + public long getTimestamp(int index) { + return timestamps[index]; + } + + @Override + public void setTimestamp(int index, long timestamp) { + timestamps[index] = timestamp; + } + + @Override + public TsPrimitiveType getValue(int index) { + return values == null ? null : values[index]; + } + + @Override + public void setValue(int index, TsPrimitiveType value) { + values[index] = value; + } + + @Override + public TimeValuePair getTimeValuePair(int index) { + if (values == null || index < 0 || index >= values.length || values[index] == null) { + return null; + } + return new TimeValuePair(timestamps[index], values[index]); + } + + @Override + public long getTimestamp() { + return 0; + } + + @Override + public void setTimestamp(long timestamp) {} + + @Override + public void setValue(TsPrimitiveType value) {} + + @Override + public TimeValuePair getTimeValuePair() { + return null; + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java index 93c379132a8b..11e7f915ba40 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/EntityMNode.java @@ -18,6 +18,9 @@ */ package org.apache.iotdb.db.metadata.mnode; +import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; +import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer; + import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,6 +37,8 @@ public class EntityMNode extends InternalMNode implements IEntityMNode { private volatile boolean useTemplate = false; + private volatile Map lastCacheMap = null; + /** * Constructor of MNode. * @@ -110,6 +115,26 @@ public void setUseTemplate(boolean useTemplate) { this.useTemplate = useTemplate; } + public ILastCacheContainer getLastCacheContainer(String measurementId) { + checkLastCacheMap(); + return lastCacheMap.computeIfAbsent(measurementId, k -> new LastCacheContainer()); + } + + @Override + public Map getTemplateLastCaches() { + return lastCacheMap == null ? Collections.emptyMap() : lastCacheMap; + } + + private void checkLastCacheMap() { + if (lastCacheMap == null) { + synchronized (this) { + if (lastCacheMap == null) { + lastCacheMap = new ConcurrentHashMap<>(); + } + } + } + } + @Override public boolean isEntity() { return true; diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java index 5b1369aaaadb..98368e939430 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IEntityMNode.java @@ -18,6 +18,8 @@ */ package org.apache.iotdb.db.metadata.mnode; +import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; + import java.util.Map; public interface IEntityMNode extends IMNode { @@ -34,6 +36,10 @@ public interface IEntityMNode extends IMNode { void setUseTemplate(boolean useTemplate); + ILastCacheContainer getLastCacheContainer(String measurementId); + + Map getTemplateLastCaches(); + static IEntityMNode setToEntity(IMNode node) { IEntityMNode entityMNode; if (node.isEntity()) { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java index 6a4d9f0f356f..2f7ae732e054 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/IMeasurementMNode.java @@ -19,8 +19,8 @@ package org.apache.iotdb.db.metadata.mnode; import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor; +import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; /** This interface defines a MeasurementMNode's operation interfaces. */ @@ -49,10 +49,7 @@ public interface IMeasurementMNode extends IMNode { void setTriggerExecutor(TriggerExecutor triggerExecutor); - TimeValuePair getCachedLast(); + ILastCacheContainer getLastCacheContainer(); - void updateCachedLast( - TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime); - - void resetCache(); + void setLastCacheContainer(ILastCacheContainer lastCacheContainer); } diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java index 27480e8656c3..1f9430a5f8e9 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/mnode/MeasurementMNode.java @@ -20,13 +20,14 @@ import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor; import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer; +import org.apache.iotdb.db.metadata.lastCache.container.LastCacheContainer; import org.apache.iotdb.db.metadata.logfile.MLogWriter; import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.qp.physical.sys.MeasurementMNodePlan; import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding; -import org.apache.iotdb.tsfile.read.TimeValuePair; import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema; import org.apache.iotdb.tsfile.write.schema.MeasurementSchema; @@ -55,7 +56,7 @@ public class MeasurementMNode extends MNode implements IMeasurementMNode { private long offset = -1; /** last value cache */ - private TimeValuePair cachedLastValuePair = null; + private volatile ILastCacheContainer lastCacheContainer = null; /** registered trigger */ private TriggerExecutor triggerExecutor = null; @@ -153,42 +154,20 @@ public void setTriggerExecutor(TriggerExecutor triggerExecutor) { } @Override - public TimeValuePair getCachedLast() { - return cachedLastValuePair; - } - - /** - * update last point cache - * - * @param timeValuePair last point - * @param highPriorityUpdate whether it's a high priority update - * @param latestFlushedTime latest flushed time - */ - @Override - public synchronized void updateCachedLast( - TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) { - if (timeValuePair == null || timeValuePair.getValue() == null) { - return; - } - - if (cachedLastValuePair == null) { - // If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will - // update cache. - if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) { - cachedLastValuePair = - new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue()); + public ILastCacheContainer getLastCacheContainer() { + if (lastCacheContainer == null) { + synchronized (this) { + if (lastCacheContainer == null) { + lastCacheContainer = new LastCacheContainer(); + } } - } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp() - || (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp() - && highPriorityUpdate)) { - cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp()); - cachedLastValuePair.setValue(timeValuePair.getValue()); } + return lastCacheContainer; } @Override - public void resetCache() { - cachedLastValuePair = null; + public void setLastCacheContainer(ILastCacheContainer lastCacheContainer) { + this.lastCacheContainer = lastCacheContainer; } @Override diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java index e9f9b2ec4738..ba6fa8e45fc2 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/tag/TagManager.java @@ -24,9 +24,9 @@ import org.apache.iotdb.db.engine.storagegroup.StorageGroupProcessor; import org.apache.iotdb.db.exception.StorageEngineException; import org.apache.iotdb.db.exception.metadata.MetadataException; -import org.apache.iotdb.db.metadata.MTree; import org.apache.iotdb.db.metadata.MetadataConstant; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.lastCache.LastCacheManager; import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan; @@ -152,7 +152,8 @@ public List getMatchedTimeseriesInIndex( allMatchedNodes.stream() .sorted( Comparator.comparingLong( - (IMeasurementMNode mNode) -> MTree.getLastTimeStamp(mNode, context)) + (IMeasurementMNode mNode) -> + LastCacheManager.getLastTimeStamp(mNode, context)) .reversed() .thenComparing(IMNode::getFullPath)) .collect(toList()); diff --git a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java index 0c1c19c3ecdb..ed9888be83a3 100644 --- a/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java +++ b/server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java @@ -27,6 +27,7 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.metadata.PartialPath; +import org.apache.iotdb.db.metadata.VectorPartialPath; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.qp.physical.crud.LastQueryPlan; import org.apache.iotdb.db.qp.physical.crud.RawDataQueryPlan; @@ -270,7 +271,17 @@ public TimeValuePair read() { try { node = (IMeasurementMNode) IoTDB.metaManager.getNodeByPath(path); } catch (MetadataException e) { - TimeValuePair timeValuePair = IoTDB.metaManager.getLastCache(path); + TimeValuePair timeValuePair; + // cluster mode may not get remote node + if (path instanceof VectorPartialPath) { + // the seriesPath has been transformed to vector path + // here needs subSensor path + timeValuePair = + IoTDB.metaManager.getLastCache( + ((VectorPartialPath) path).getSubSensorsPathList().get(0)); + } else { + timeValuePair = IoTDB.metaManager.getLastCache(path); + } if (timeValuePair != null) { return timeValuePair; } @@ -279,11 +290,34 @@ public TimeValuePair read() { if (node == null) { return null; } - return node.getCachedLast(); + + if (path instanceof VectorPartialPath) { + // the seriesPath has been transformed to vector path + // here needs subSensor path + return IoTDB.metaManager.getLastCache( + node, ((VectorPartialPath) path).getSubSensorsPathList().get(0).getMeasurement()); + } else { + return IoTDB.metaManager.getLastCache(node); + } } public void write(TimeValuePair pair) { - IoTDB.metaManager.updateLastCache(path, pair, false, Long.MIN_VALUE, node); + if (node == null) { + IoTDB.metaManager.updateLastCache(path, pair, false, Long.MIN_VALUE); + } else { + if (path instanceof VectorPartialPath) { + // the seriesPath has been transformed to vector path + // here needs subSensor path + IoTDB.metaManager.updateLastCache( + node, + ((VectorPartialPath) path).getSubSensorsPathList().get(0).getMeasurement(), + pair, + false, + Long.MIN_VALUE); + } else { + IoTDB.metaManager.updateLastCache(node, pair, false, Long.MIN_VALUE); + } + } } } diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java index 71db7cb8b911..9b84d521fed8 100644 --- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java +++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java @@ -162,7 +162,7 @@ public void testTTLWrite() plan.setMeasurementMNodes( new IMeasurementMNode[] { new MeasurementMNode( - null, null, new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null) + null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null) }); plan.transferType(); @@ -195,7 +195,7 @@ private void prepareData() plan.setMeasurementMNodes( new IMeasurementMNode[] { new MeasurementMNode( - null, null, new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null) + null, "s1", new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN), null) }); plan.transferType(); diff --git a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java index 234e0d7906e5..e30e923d5959 100644 --- a/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java +++ b/server/src/test/java/org/apache/iotdb/db/integration/IoTDBLastIT.java @@ -20,8 +20,6 @@ import org.apache.iotdb.db.exception.metadata.MetadataException; import org.apache.iotdb.db.metadata.PartialPath; -import org.apache.iotdb.db.metadata.mnode.IMNode; -import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.service.IoTDB; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.jdbc.Config; @@ -225,16 +223,14 @@ public void lastCacheUpdateTest() throws SQLException, MetadataException { } } - IMeasurementMNode node = - (IMeasurementMNode) - IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt01.temperature")); - node.resetCache(); + PartialPath path = new PartialPath("root.ln.wf01.wt01.temperature"); + IoTDB.metaManager.resetLastCache(path); statement.execute( "insert into root.ln.wf01.wt01(time, temperature, status, id) values(700, 33.1, false, 3)"); // Last cache is updated with above insert sql - long time = node.getCachedLast().getTimestamp(); + long time = IoTDB.metaManager.getLastCache(path).getTimestamp(); Assert.assertEquals(700, time); hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt01"); @@ -258,7 +254,7 @@ public void lastCacheUpdateTest() throws SQLException, MetadataException { "insert into root.ln.wf01.wt01(time, temperature, status, id) values(600, 19.1, false, 1)"); // Last cache is not updated with above insert sql - time = node.getCachedLast().getTimestamp(); + time = IoTDB.metaManager.getLastCache(path).getTimestamp(); Assert.assertEquals(700, time); hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt01"); @@ -297,9 +293,9 @@ public void lastWithUnSeqFilesTest() throws SQLException, MetadataException { DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { - IMNode node = - IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt02.temperature")); - ((IMeasurementMNode) node).resetCache(); + PartialPath path = new PartialPath("root.ln.wf01.wt02.temperature"); + IoTDB.metaManager.resetLastCache(path); + boolean hasResultSet = statement.execute("select last temperature,status,id from root.ln.wf01.wt02"); @@ -343,7 +339,7 @@ public void lastWithUnSeqFilesTest() throws SQLException, MetadataException { } Assert.assertEquals(cnt, retArray.length); - ((IMeasurementMNode) node).resetCache(); + IoTDB.metaManager.resetLastCache(path); String[] retArray3 = new String[] { "900,root.ln.wf01.wt01.temperature,10.2,DOUBLE", @@ -387,9 +383,7 @@ public void lastWithEmptyChunkMetadataTest() throws SQLException, MetadataExcept DriverManager.getConnection("jdbc:iotdb://127.0.0.1:6667/", "root", "root"); Statement statement = connection.createStatement()) { - IMNode node = - IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt03.temperature")); - ((IMeasurementMNode) node).resetCache(); + IoTDB.metaManager.resetLastCache(new PartialPath("root.ln.wf01.wt03.temperature")); statement.execute( "INSERT INTO root.ln.wf01.wt03(timestamp,status, id) values(500, false, 9)"); @@ -438,9 +432,7 @@ public void lastWithUnseqTimeLargerThanSeqTimeTest() throws SQLException, Metada statement.execute("INSERT INTO root.ln.wf01.wt04(timestamp,temperature) values(150,31.2)"); statement.execute("flush"); - IMNode node = - IoTDB.metaManager.getNodeByPath(new PartialPath("root.ln.wf01.wt04.temperature")); - ((IMeasurementMNode) node).resetCache(); + IoTDB.metaManager.resetLastCache(new PartialPath("root.ln.wf01.wt04.temperature")); boolean hasResultSet = statement.execute("select last temperature from root.ln.wf01.wt04"); diff --git a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java index 21e3c0133bf1..b31cbcc0ad47 100644 --- a/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java +++ b/server/src/test/java/org/apache/iotdb/db/metadata/MManagerAdvancedTest.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.metadata; import org.apache.iotdb.db.exception.metadata.MetadataException; +import org.apache.iotdb.db.metadata.lastCache.LastCacheManager; import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.service.IoTDB; @@ -218,13 +219,12 @@ public void testCachedLastTimeValue() throws MetadataException { TimeValuePair tv1 = new TimeValuePair(1000, TsPrimitiveType.getByType(TSDataType.DOUBLE, 1.0)); TimeValuePair tv2 = new TimeValuePair(2000, TsPrimitiveType.getByType(TSDataType.DOUBLE, 3.0)); TimeValuePair tv3 = new TimeValuePair(1500, TsPrimitiveType.getByType(TSDataType.DOUBLE, 2.5)); - IMNode node = mmanager.getNodeByPath(new PartialPath("root.vehicle.d2.s0")); - ((IMeasurementMNode) node).updateCachedLast(tv1, true, Long.MIN_VALUE); - ((IMeasurementMNode) node).updateCachedLast(tv2, true, Long.MIN_VALUE); - Assert.assertEquals( - tv2.getTimestamp(), ((IMeasurementMNode) node).getCachedLast().getTimestamp()); - ((IMeasurementMNode) node).updateCachedLast(tv3, true, Long.MIN_VALUE); - Assert.assertEquals( - tv2.getTimestamp(), ((IMeasurementMNode) node).getCachedLast().getTimestamp()); + PartialPath path = new PartialPath("root.vehicle.d2.s0"); + IMeasurementMNode node = (IMeasurementMNode) mmanager.getNodeByPath(path); + LastCacheManager.updateLastCache(path, tv1, true, Long.MIN_VALUE, node); + LastCacheManager.updateLastCache(path, tv2, true, Long.MIN_VALUE, node); + Assert.assertEquals(tv2.getTimestamp(), mmanager.getLastCache(node).getTimestamp()); + LastCacheManager.updateLastCache(path, tv3, true, Long.MIN_VALUE, node); + Assert.assertEquals(tv2.getTimestamp(), mmanager.getLastCache(node).getTimestamp()); } }