[IOTDB-298]Last time-value query#821
Conversation
JackieTien97
left a comment
There was a problem hiding this comment.
First, you should pay attention to your code format.
- our tab is two blanks. yours is four
- you have lots of useless import statement, please remove them
Second, you should add some UTs and ITs.
Last, there are some wrong usage of IAggregateReader, I have commented.
| while (seriesReader.hasNextChunk()) { | ||
| while (seriesReader.hasNextPage()) { | ||
| // cal by page data | ||
| while (seriesReader.hasNextOverlappedPage()) { | ||
| BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage(); | ||
| int maxIndex = nextOverlappedPageData.length() - 1; | ||
| if (maxIndex < 0) { | ||
| continue; | ||
| } | ||
| long time = nextOverlappedPageData.getTimeByIndex(maxIndex); | ||
| if (time > maxTime) { | ||
| maxTime = time; | ||
| queryResult.setPairResult(maxTime, nextOverlappedPageData.getValueInTimestamp(time), tsDataType); | ||
| } | ||
| nextOverlappedPageData.resetBatchData(); | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
You can use the chunk and page statistics if possible. The code will be like as:
while (seriesReader.hasNextChunk()) {
if (seriesReader.canUseCurrentChunkStatistics()) {
if (seriesReader.currentChunkStatistics().getEndTime() > maxTime) {
maxTime = seriesReader.currentChunkStatistics().getEndTime();
queryResult.setPairResult(maxTime, seriesReader.currentChunkStatistics().getLastValue(), tsDataType);
}
seriesReader.skipCurrentChunk();
continue;
}
while (seriesReader.hasNextPage()) {
if (seriesReader.canUseCurrentPageStatistics()) {
if (seriesReader.currentPageStatistics().getEndTime() > maxTime) {
maxTime = seriesReader.currentPageStatistics().getEndTime();
queryResult.setPairResult(maxTime, seriesReader.currentPageStatistics().getLastValue(), tsDataType);
}
seriesReader.skipCurrentPage();
continue;
}
// cal by page data
while (seriesReader.hasNextOverlappedPage()) {
BatchData nextOverlappedPageData = seriesReader.nextOverlappedPage();
int maxIndex = nextOverlappedPageData.length() - 1;
if (maxIndex < 0) {
continue;
}
long time = nextOverlappedPageData.getTimeByIndex(maxIndex);
if (time > maxTime) {
maxTime = time;
queryResult.setPairResult(maxTime, nextOverlappedPageData.getValueInTimestamp(time), tsDataType);
}
nextOverlappedPageData.resetBatchData();
}
}
}
server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
Outdated
Show resolved
Hide resolved
| } | ||
| insertPlan.setDataTypes(dataTypes); | ||
| storageEngine.insert(insertPlan); | ||
| insertPlan.updateMNodeLastValues(node); |
There was a problem hiding this comment.
It is not suitable to put the updateMNodeLastValues method in the InsertPlan.
You can update the last value in the before loop, there you already got the measurementNode.
There was a problem hiding this comment.
The update function is moved into MNode structure.
I added a loop to update the last value after storageEngine.insert(). It is possible that insert() could fail and throw exceptions. In such case the last value cache will not be updated.
| return storageEngine.insertBatch(batchInsertPlan); | ||
|
|
||
| Integer[] results = storageEngine.insertBatch(batchInsertPlan); | ||
| batchInsertPlan.updateMNodeLastValues(node); |
| SFW, JOIN, UNION, FILTER, GROUPBY, ORDERBY, LIMIT, SELECT, SEQTABLESCAN, HASHTABLESCAN, | ||
| MERGEJOIN, FILEREAD, NULL, TABLESCAN, UPDATE, INSERT, BATCHINSERT, DELETE, BASIC_FUNC, IN, QUERY, MERGEQUERY, | ||
| AGGREGATION, AUTHOR, FROM, FUNC, LOADDATA, METADATA, PROPERTY, INDEX, INDEXQUERY, FILL, | ||
| AGGREGATION, AUTHOR, FROM, FUNC, LOADDATA, METADATA, PROPERTY, INDEX, INDEXQUERY, FILL, LAST, |
There was a problem hiding this comment.
It's better to add it to the last, because you don't know whether the programmers are using the enum's ordinal() function
| public void updateMNodeLastValues(MNode node) { | ||
| long maxTime = Long.MIN_VALUE; | ||
| int maxIndex = 0; | ||
| for (int i = 0; i < times.length; i++) { | ||
| if (times[i] > maxTime) { | ||
| maxTime = times[i]; | ||
| maxIndex = i; | ||
| } | ||
| } | ||
| for (int i = 0; i < measurements.length; i++) { | ||
| if (node.hasChild(measurements[i])) { | ||
| Object[] column = (Object[]) columns[i]; | ||
| node.getChild(measurements[i]).updateCachedLast(maxTime, column[maxIndex], dataTypes[i]); | ||
| } | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
This function is not the InsertPlan's responsibility
There was a problem hiding this comment.
Agree. Moved it out of InsertPlan
| public void updateMNodeLastValues(MNode node) throws QueryProcessException { | ||
| for (int i = 0; i < measurements.length; i++) { | ||
| if (node.hasChild(measurements[i])) { | ||
| Object value = CommonUtils.parseValue(dataTypes[i], values[i]); | ||
| node.getChild(measurements[i]).updateCachedLast(time, value, dataTypes[i]); | ||
| } | ||
| } | ||
| } |
server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
Outdated
Show resolved
Hide resolved
JackieTien97
left a comment
There was a problem hiding this comment.
There are still some places need to be changed.
UTs and ITs are needed.
You should also add documents in both EN and CH docs, tell us about the last grammar and show us some examples.
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
Outdated
Show resolved
Hide resolved
| long maxTime = Long.MIN_VALUE; | ||
| int maxIndex = 0; | ||
| for (int i = 0; i < times.length; i++) { | ||
| if (times[i] > maxTime) { | ||
| maxTime = times[i]; | ||
| maxIndex = i; | ||
| } | ||
| } |
There was a problem hiding this comment.
No matter what measurementIndex is, this calculation process is the same. It only needs to be called once.
You can extract that method outside the the calling loop.
| return; | ||
| if (cachedLastValuePair == null) { | ||
| cachedLastValuePair = new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue()); | ||
| } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()) { |
There was a problem hiding this comment.
| } else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()) { | |
| } else if (timeValuePair.getTimestamp() >= cachedLastValuePair.getTimestamp()) { |
insert time-value 1-1
insert time-value 1-2
the vlaue of 1 should be updated to 2
| public TimeValuePair composeLastTimeValuePair(int measurementIndex) { | ||
| long maxTime = Long.MIN_VALUE; | ||
| int maxIndex = 0; | ||
| for (int i = 0; i < times.length; i++) { |
There was a problem hiding this comment.
the time in BatchInsertPlan is always in ascending order, you could directly get the last
| LastFunctionCallContext functionCallContext = ctx.lastFunctionCall(); | ||
| Path path = parseSuffixPath(functionCallContext.suffixPath()); | ||
| selectOp.addLastPath(path, functionCallContext.LAST().getText()); | ||
| queryOp.setSelectOperator(selectOp); |
| lastQueryResultList.add(lastQueryResult); | ||
| } | ||
|
|
||
| RowRecord resultRecord = constructLastRowRecord(lastQueryResultList); |
There was a problem hiding this comment.
The result format should be updated to path, time, value
| return resultRecord; | ||
| } | ||
|
|
||
| class LastQueryResult { |
There was a problem hiding this comment.
I think this class is not needed, you could use TimeValuePair directly unless you store the path of a series in this class.
There was a problem hiding this comment.
Removed in the new version
| } | ||
|
|
||
| // construct series reader without value filter | ||
| Filter timeFilter = null; |
There was a problem hiding this comment.
remove this field and use null directly
| } | ||
| } | ||
| if (queryResult.hasResult()) | ||
| node.setCachedLast(queryResult.getPairResult()); |
There was a problem hiding this comment.
Notice: considering inserting a point while querying.
It's better to remove the setCachedLast and change updateCachedLast method to a synchronized one. Then, extend updateCachedLast with a parameter: boolean highPriority, which is to determine whether update the value if the time is equal to the previous cached one.
Then, in the normal inserting process, update the cache with high priority. Here, update the cache with low priority.
This could resolve the condition: there is an insert (10,11) when you get the last point (10,10) from disk.
|
|
||
| 下面的例子中查询时间序列root.ln.wf01.wt01.status最近时间戳的数据: | ||
| ``` | ||
| select last(status) from root.ln.wf01.wt01 disable align |
| */ | ||
| protected String fullPath; | ||
|
|
||
| private TimeValuePair cachedLastValuePair = null; |
| return cachedLastValuePair; | ||
| } | ||
|
|
||
| public synchronized void updateCachedLast( |
There was a problem hiding this comment.
add javadoc for highPriorityUpdate
|
|
||
| private List<Path> suffixList; | ||
| private List<String> aggregations; | ||
| private boolean hasLast; |
There was a problem hiding this comment.
| private boolean hasLast; | |
| private boolean lastQuery; |
| queryPlan = new AggregationPlan(); | ||
| ((AggregationPlan) queryPlan) | ||
| .setAggregations(queryOperator.getSelectOperator().getAggregations()); | ||
| } else if (queryOperator.hasLast()) { |
There was a problem hiding this comment.
| } else if (queryOperator.hasLast()) { | |
| } else if (queryOperator.isLastQuery()) { |
server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
Outdated
Show resolved
Hide resolved
| plan.setDataTypeConsistencyChecker(null); | ||
| } | ||
|
|
||
| private void getLastQueryHeaders( |
There was a problem hiding this comment.
I don't remember, doesn't the respColumns contain time column?
| throw new QueryProcessException(e); | ||
| } | ||
| } | ||
|
|
| @@ -178,6 +180,12 @@ public class StorageGroupProcessor { | |||
| * file. | |||
| */ | |||
| private Map<Long, Map<String, Long>> latestFlushedTimeForEachDevice = new HashMap<>(); | |||
There was a problem hiding this comment.
rename this to latestDeviceFlushedTimeInEachPartition distinguish with globalLatestFlushedTimeForEachDevice
| latestFlushedTimeForEachDevice.computeIfAbsent(timePartitionId, id -> new HashMap<>()) | ||
| .putIfAbsent(insertPlan.getDeviceId(), Long.MIN_VALUE); | ||
|
|
||
| long latestFlushedTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent( |
There was a problem hiding this comment.
| long latestFlushedTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent( | |
| long globalLatestFlushTime = globalLatestFlushedTimeForEachDevice.computeIfAbsent( |
| if (latestFlushedTime < insertPlan.getTime()) | ||
| globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime()); |
There was a problem hiding this comment.
| if (latestFlushedTime < insertPlan.getTime()) | |
| globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime()); | |
| if (latestFlushedTime < insertPlan.getTime()) { | |
| globalLatestFlushedTimeForEachDevice.put(insertPlan.getDeviceId(), insertPlan.getTime()); | |
| } |
There was a problem hiding this comment.
Then, move these three lines to insertToTsFileProcessor(), make it updated at the same time as the lastFlushTimeForEachDevice
There was a problem hiding this comment.
Yes moved, looks better now.
| } | ||
| } | ||
|
|
||
| public void updateInsertPlanLast(InsertPlan plan, Long latestFlushedTime) |
There was a problem hiding this comment.
| public void updateInsertPlanLast(InsertPlan plan, Long latestFlushedTime) | |
| public void tryToUpdateCache(InsertPlan plan, Long latestFlushedTime) |
server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
Outdated
Show resolved
Hide resolved
|
|
||
| TimeValuePair resultPair = new TimeValuePair(Long.MIN_VALUE, null); | ||
|
|
||
| for (int i = seqFileResources.size() - 1; i >= 0; i--) { |
There was a problem hiding this comment.
no need to loop, get the last one directly
server/src/main/java/org/apache/iotdb/db/query/executor/LastQueryExecutor.java
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| // Update cached last value with low priority | ||
| ((LeafMNode)node).updateCachedLast(resultPair, false, Long.MIN_VALUE); |
There was a problem hiding this comment.
when I saw this line, I think maybe move the lastFlushTime out of the updateCachedLast mehod and check in the insertion is better
| List<TsFileResource> seqFileResources = dataSource.getSeqResources(); | ||
| List<TsFileResource> unseqFileResources = dataSource.getUnseqResources(); | ||
|
|
||
| TimeValuePair resultPair = new TimeValuePair(Long.MIN_VALUE, null); |
There was a problem hiding this comment.
| TimeValuePair resultPair = new TimeValuePair(Long.MIN_VALUE, null); | |
| TimeValuePair resultPair; |
|
|
||
| ## Last 缓存更新策略 | ||
|
|
||
| Last缓存更新的逻辑位于`LeafNode`的`updateCachedLast`方法内,这里引入两个额外的参数`highPriorityUpdate`和`latestFlushTime`。`highPriorityUpdate`用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。`latestFlushTime`用来记录当前已被写回到磁盘的数据的最大时间戳。 |
There was a problem hiding this comment.
| Last缓存更新的逻辑位于`LeafNode`的`updateCachedLast`方法内,这里引入两个额外的参数`highPriorityUpdate`和`latestFlushTime`。`highPriorityUpdate`用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。`latestFlushTime`用来记录当前已被写回到磁盘的数据的最大时间戳。 | |
| Last缓存更新的逻辑位于`LeafMNode`的`updateCachedLast`方法内,这里引入两个额外的参数`highPriorityUpdate`和`latestFlushTime`。`highPriorityUpdate`用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。`latestFlushTime`用来记录当前已被写回到磁盘的数据的最大时间戳。 |
JackieTien97
left a comment
There was a problem hiding this comment.
No need to judge whether the chunkLoader is null, otherwise it will cause the file closed exception like before, please fix it.
| if (data.getChunkLoader() == null) { | ||
| TsFileSequenceReader tsFileSequenceReader = | ||
| FileReaderManager.getInstance().get(resource, resource.isClosed()); | ||
| data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader)); | ||
| } |
There was a problem hiding this comment.
Remove the if judgement, it will cause the file closed null pointer exception.
like:
for (ChunkMetaData data : currentChunkMetaDataList) {
TsFileSequenceReader tsFileSequenceReader =
FileReaderManager.getInstance().get(resource, resource.isClosed());
data.setChunkLoader(new DiskChunkLoader(tsFileSequenceReader));
}
it's ok to do so. because tsFileSequenceReader has been cached.
JIRA issue: https://issues.apache.org/jira/browse/IOTDB-298
This PR implements a new LAST query for IoTDB
The LAST query is to return the most recent value of the given timeseries in a time-value format.
The basic Sql syntax of Last query is:
In order to trace the latest timestamp and value as quick as possible, this PR adds a cache to store this latest time and value in MNodes. This cache will be updated when values with larger timestamps are written to the specific timeseries.