Skip to content

Commit

Permalink
Merge branch 'master' into cherry_pick_cluster
Browse files Browse the repository at this point in the history
# Conflicts:
#	server/src/main/java/org/apache/iotdb/db/engine/storagegroup/StorageGroupProcessor.java
#	server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
#	server/src/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
#	server/src/main/java/org/apache/iotdb/db/query/executor/AggregationExecutor.java
#	server/src/main/java/org/apache/iotdb/db/service/TSServiceImpl.java
#	server/src/test/java/org/apache/iotdb/db/integration/IoTDBSeriesReaderIT.java
  • Loading branch information
jt2594838 committed Mar 3, 2020
2 parents 646386c + c00b63f commit cb8ecb8
Show file tree
Hide file tree
Showing 87 changed files with 2,139 additions and 870 deletions.
Expand Up @@ -37,3 +37,4 @@
* [原始数据查询](/#/SystemDesign/progress/chap5/sec3)
* [聚合查询](/#/SystemDesign/progress/chap5/sec4)
* [降采样查询](/#/SystemDesign/progress/chap5/sec5)
* [最近时间戳查询](/#/SystemDesign/progress/chap5/sec6)
Expand Up @@ -212,7 +212,7 @@ while (aggregateReader.hasNextChunk()) {

当前第一个chunk meta data的引用

* private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
* private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
new PriorityQueue<>(
Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));

Expand Down Expand Up @@ -272,35 +272,35 @@ while (aggregateReader.hasNextChunk()) {

这个方法判断是否有下一个Page,一般在`firstChunkMetaData`不可直接使用时,继续解成Page。

首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`overlappedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false``firstChunkMetaData`置为`null`。若`overlappedPageReaders`为空则返回`false`,若不为空,返回`true`
首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`cachedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false``firstChunkMetaData`置为`null`。若`cachedPageReaders`为空则返回`false`,若不为空,返回`true`

#### isPageOverlapped()

这个方法判断当前的Page有没有其他与之重叠的Page存在。

如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`
如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`

#### nextPage()

须与`isPageOverlapped()`方法搭配使用。

`overlappedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`overlappedPageReaders`的第一个Page里符合过滤条件的所有data。
`cachedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`cachedPageReaders`的第一个Page里符合过滤条件的所有data。

#### currentPageStatistics()

返回`overlappedPageReaders`里第一个Page的统计信息。
返回`cachedPageReaders`里第一个Page的统计信息。

#### skipCurrentPage()

跳过当前Page。只需要将`overlappedPageReaders`里第一个PageReader删掉即可。
跳过当前Page。只需要将`cachedPageReaders`里第一个PageReader删掉即可。

#### hasNextOverlappedPage()

这个方法判断当前还有没有重叠的Page。

如果`hasCachedNextBatch``true`,直接返回`true`

否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`overlappedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`cachedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。

然后先从`mergeReader`里取出当前最大的结束时间,作为此次所要返回的batch的结束时间,记为`currentPageEndTime`。接着去遍历`mergeReader`,直到当前的时间戳大于`currentPageEndTime`

Expand Down
119 changes: 119 additions & 0 deletions docs/Documentation-CHN/SystemDesign/5-DataQuery/6-LastQuery.md
@@ -0,0 +1,119 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# 最近时间戳 Last 查询

Last 查询的主要逻辑在 LastQueryExecutor

* org.apache.iotdb.db.query.executor.LastQueryExecutor

Last查询对每个指定的时间序列执行`calculateLastPairForOneSeries`方法。

## 读取MNode缓存数据

我们在需要查询的时间序列所对应的MNode结构中添加Last数据缓存。`calculateLastPairForOneSeries`方法对于某个时间序列的Last查询,首先尝试读取MNode中的缓存数据。
```
try {
node = MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString());
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
if (((LeafMNode) node).getCachedLast() != null) {
return ((LeafMNode) node).getCachedLast();
}
```
如果发现缓存没有被写入过,则执行下面的标准查询流程读取TsFile数据。

## Last标准查询流程

Last标准查询流程需要遍历所有的顺序文件和乱序文件得到查询结果,最后将查询结果写回到MNode缓存。算法中对顺序文件和乱序文件分别进行处理。
- 顺序文件由于是对其写入时间已经排好序,因此直接使用`loadChunkMetadataFromTsFileResource`方法取出最后一个`ChunkMetadata`,通过`ChunkMetadata`的统计数据得到最大时间戳和对应的值。
```
if (!seqFileResources.isEmpty()) {
List<ChunkMetaData> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(
seqFileResources.get(seqFileResources.size() - 1), seriesPath, context);
if (!chunkMetadata.isEmpty()) {
ChunkMetaData lastChunkMetaData = chunkMetadata.get(chunkMetadata.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
}
}
```
- 乱序文件则需要遍历所有的`ChunkMetadata`结构得到最大时间戳数据。需要注意的是当多个`ChunkMetadata`拥有相同的时间戳时,我们取`version`值最大的`ChunkMatadata`中的数据作为Last的结果。

```
long version = 0;
for (TsFileResource resource : unseqFileResources) {
if (resource.getEndTimeMap().get(seriesPath.getDevice()) < resultPair.getTimestamp()) {
break;
}
List<ChunkMetaData> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(resource, seriesPath, context);
for (ChunkMetaData chunkMetaData : chunkMetadata) {
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
version = chunkMetaData.getVersion();
}
}
}
```
- 最后将查询结果写入到MNode的Last缓存
```
((LeafMNode) node).updateCachedLast(resultPair, false, Long.MIN_VALUE);
```

## Last 缓存更新策略

Last缓存更新的逻辑位于`LeafMNode``updateCachedLast`方法内,这里引入两个额外的参数`highPriorityUpdate``latestFlushTime``highPriorityUpdate`用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。`latestFlushTime`用来记录当前已被写回到磁盘的数据的最大时间戳。

缓存更新的策略如下:

1. 当缓存中没有记录时,对于查询到的Last数据,将查询的结果直接写入到缓存中。
2. 当缓存中没有记录时,对于写入的最新数据如果时间戳大于或等于`latestFlushTime`,则将写入的数据写入到缓存中。
3. 当缓存中已有记录时,根据查询或写入的数据时间戳与当前缓存中时间戳作对比。写入的数据具有高优先级,时间戳不小于缓存记录则更新缓存;查询出的数据低优先级,必须大于缓存记录的时间戳才更新缓存。

具体代码如下
```
public synchronized void updateCachedLast(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
if (timeValuePair == null || timeValuePair.getValue() == null) return;
if (cachedLastValuePair == null) {
// If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache.
if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
cachedLastValuePair =
new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue());
}
} else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()
|| (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp()
&& highPriorityUpdate)) {
cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp());
cachedLastValuePair.setValue(timeValuePair.getValue());
}
}
```
Expand Up @@ -276,6 +276,39 @@ GROUP BY的SELECT子句里的查询路径必须是聚合函数,否则系统将

<center><img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/16079446/69116099-0b715300-0ac6-11ea-8074-84e04797b8c7.png"></center>

### 最近时间戳数据查询

对应的SQL语句是:

```
select last <Path> [COMMA <Path>]* from < PrefixPath > [COMMA < PrefixPath >]* <DISABLE ALIGN>
```
其含义是:

查询时间序列prefixPath.path中最近时间戳的数据

下面的例子中查询时间序列root.ln.wf01.wt01.status最近时间戳的数据:
```
select last status from root.ln.wf01.wt01 disable align
```
结果集为以下的形式返回:
```
| Time | Path | Value |
| --- | ----------------------- | ----- |
| 5 | root.ln.wf01.wt01.status| 100 |
```

假设root.ln.wf01.wt01中包含多列数据,如id, status, temperature,下面的例子将会把这几列数据在最近时间戳的记录同时返回:
```
select last id, status, temperature from root.ln.wf01 disable align
| Time | Path | Value |
| --- | ---------------------------- | ----- |
| 5 | root.ln.wf01.wt01.id | 10 |
| 7 | root.ln.wf01.wt01.status | true |
| 9 | root.ln.wf01.wt01.temperature| 35.7 |
```


## 数据维护

Expand Down
Expand Up @@ -446,6 +446,40 @@ root.sg1.d0.s0 is INT32 while root.sg2.d3.s0 is FLOAT.
```

* Last语句

Last 语句返回所要查询时间序列的最近时间戳的一条数据

```
SELECT LAST <SelectClause> FROM <FromClause> <DisableAlignClause>
Select Clause : <Path> [COMMA <Path>]*
FromClause : < PrefixPath > [COMMA < PrefixPath >]*
DisableAlignClause : [DISABLE ALIGN]
Eg. SELECT LAST s1 FROM root.sg.d1 disable align
Eg. SELECT LAST s1, s2 FROM root.sg.d1 disable align
Eg. SELECT LAST s1 FROM root.sg.d1, root.sg.d2 disable align
规则:
1. 需要满足PrefixPath.Path 为一条完整的时间序列,即 <PrefixPath> + <Path> = <Timeseries>
2. SELECT LAST 语句不支持过滤条件.
3. 结果集以"disable align"的形式返回,表现为总是包含三列的表格。
例如 "select last s1, s2 from root.sg.d1, root.sg.d2 disable align", 结果集返回如下:
| Time | Path | Value |
| --- | ------------ | ----- |
| 5 | root.sg.d1.s1| 100 |
| 2 | root.sg.d1.s2| 400 |
| 4 | root.sg.d2.s1| 250 |
| 9 | root.sg.d2.s2| 600 |
4. SELECT LAST 查询语句要是总是和末尾的disable align在一起使用。如果用户不熟悉SELECT LAST的语法或者忘记在末尾添加"disable align",IoTDB 也会接受不包含"disable align"的SQL语句并且仍以"disable align"的形式返回结果集。
例如用户输入 "select last s1 from root.sg.d1" 所得到的查询结果与 "select last s1 from root.sg.d1 disable align". 的结果是完全相同的。
```

### 数据库管理语句

* 创建用户
Expand Down
Expand Up @@ -243,6 +243,41 @@ The path after SELECT in GROUP BY statement must be aggregate function, otherwis

<center><img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/16079446/69116099-0b715300-0ac6-11ea-8074-84e04797b8c7.png"></center>

### Last timestamp Query
In scenarios when IoT devices updates data in a fast manner, users are more interested in the most recent record of IoT devices.
The LAST query is to return the most recent value of the given timeseries in a time-value pair format.

The SQL statement is:

```
select last <Path> [COMMA <Path>]* from < PrefixPath > [COMMA < PrefixPath >]* <DISABLE ALIGN>
```
which means:

Query and return the data with the largest timestamp of timeseries prefixPath.path.

In the following example, we queries the latest record of timeseries root.ln.wf01.wt01.status:
```
select last status from root.ln.wf01.wt01 disable align
```
The result will be returned in a three column table format.
```
| Time | Path | Value |
| --- | ----------------------- | ----- |
| 5 | root.ln.wf01.wt01.status| 100 |
```
If the path root.ln.wf01.wt01 has multiple columns, for example id, status and temperature, the following case will return records of all the three measurements with the largest timestamp.
```
select last id, status, temperature from root.ln.wf01.wt01 disable align
| Time | Path | Value |
| --- | ---------------------------- | ----- |
| 5 | root.ln.wf01.wt01.id | 10 |
| 7 | root.ln.wf01.wt01.status | true |
| 9 | root.ln.wf01.wt01.temperature| 35.7 |
```


### Automated Fill

In the actual use of IoTDB, when doing the query operation of timeseries, situations where the value is null at some time points may appear, which will obstruct the further analysis by users. In order to better reflect the degree of data change, users expect missing values to be automatically filled. Therefore, the IoTDB system introduces the function of Automated Fill.
Expand Down
33 changes: 33 additions & 0 deletions docs/Documentation/UserGuide/5-Operation Manual/4-SQL Reference.md
Expand Up @@ -457,6 +457,39 @@ You could expect a table like:
```

* Select Last Record Statement

The LAST function returns the last time-value pair of the given timeseries. Currently filters are not supported in LAST queries.

```
SELECT LAST <SelectClause> FROM <FromClause> <DisableAlignClause>
Select Clause : <Path> [COMMA <Path>]*
FromClause : < PrefixPath > [COMMA < PrefixPath >]*
DisableAlignClause : [DISABLE ALIGN]
Eg. SELECT LAST s1 FROM root.sg.d1 disable align
Eg. SELECT LAST s1, s2 FROM root.sg.d1 disable align
Eg. SELECT LAST s1 FROM root.sg.d1, root.sg.d2 disable align
Rules:
1. the statement needs to satisfy this constraint: <PrefixPath> + <Path> = <Timeseries>
2. The result set of last query will always be displayed in a "disable-aligned" format showed below.
For example, "select last s1, s2 from root.sg.d1, root.sg.d2 disable align", the query result would be:
| Time | Path | Value |
| --- | ------------ | ----- |
| 5 | root.sg.d1.s1| 100 |
| 2 | root.sg.d1.s2| 400 |
| 4 | root.sg.d2.s1| 250 |
| 9 | root.sg.d2.s2| 600 |
3. LAST query syntax is expecting users to write a "diable align" keyword at the end of the query.
However, as it is a unique SQL syntax in IoTDB, IoTDB accepts LAST queries without "disable align" and treats them as "disable align" ones.
Query like "select last s1 from root.sg.d1" will be parsed exactly the same as "select last s1 from root.sg.d1 disable align".
```

### Database Management Statement

* Create User
Expand Down

0 comments on commit cb8ecb8

Please sign in to comment.