Skip to content

Commit

Permalink
merge last
Browse files Browse the repository at this point in the history
  • Loading branch information
JackieTien97 committed Mar 3, 2020
2 parents cf31d9c + c00b63f commit b3f035b
Show file tree
Hide file tree
Showing 89 changed files with 2,316 additions and 952 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@
* [原始数据查询](/#/SystemDesign/progress/chap5/sec3)
* [聚合查询](/#/SystemDesign/progress/chap5/sec4)
* [降采样查询](/#/SystemDesign/progress/chap5/sec5)
* [最近时间戳查询](/#/SystemDesign/progress/chap5/sec6)
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ while (aggregateReader.hasNextChunk()) {

当前第一个chunk meta data的引用

* private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
* private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
new PriorityQueue<>(
Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));

Expand Down Expand Up @@ -272,35 +272,35 @@ while (aggregateReader.hasNextChunk()) {

这个方法判断是否有下一个Page,一般在`firstChunkMetaData`不可直接使用时,继续解成Page。

首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`overlappedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false``firstChunkMetaData`置为`null`。若`overlappedPageReaders`为空则返回`false`,若不为空,返回`true`
首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`cachedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false``firstChunkMetaData`置为`null`。若`cachedPageReaders`为空则返回`false`,若不为空,返回`true`

#### isPageOverlapped()

这个方法判断当前的Page有没有其他与之重叠的Page存在。

如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`
如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`

#### nextPage()

须与`isPageOverlapped()`方法搭配使用。

`overlappedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`overlappedPageReaders`的第一个Page里符合过滤条件的所有data。
`cachedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`cachedPageReaders`的第一个Page里符合过滤条件的所有data。

#### currentPageStatistics()

返回`overlappedPageReaders`里第一个Page的统计信息。
返回`cachedPageReaders`里第一个Page的统计信息。

#### skipCurrentPage()

跳过当前Page。只需要将`overlappedPageReaders`里第一个PageReader删掉即可。
跳过当前Page。只需要将`cachedPageReaders`里第一个PageReader删掉即可。

#### hasNextOverlappedPage()

这个方法判断当前还有没有重叠的Page。

如果`hasCachedNextBatch``true`,直接返回`true`

否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`overlappedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`cachedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。

然后先从`mergeReader`里取出当前最大的结束时间,作为此次所要返回的batch的结束时间,记为`currentPageEndTime`。接着去遍历`mergeReader`,直到当前的时间戳大于`currentPageEndTime`

Expand Down Expand Up @@ -411,4 +411,4 @@ return false;

#### canUseCurrentPageStatistics()

先调用`SeriesReader``currentPageStatistics()`方法,获得当前page的统计信息,再调用`SeriesReader``isPageOverlapped()`方法判断当前page是否重叠,如果当前page不重叠,且其统计信息满足过滤条件,则返回`true`,否则返回`false`
先调用`SeriesReader``currentPageStatistics()`方法,获得当前page的统计信息,再调用`SeriesReader``isPageOverlapped()`方法判断当前page是否重叠,如果当前page不重叠,且其统计信息满足过滤条件,则返回`true`,否则返回`false`
119 changes: 119 additions & 0 deletions docs/Documentation-CHN/SystemDesign/5-DataQuery/6-LastQuery.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# 最近时间戳 Last 查询

Last 查询的主要逻辑在 LastQueryExecutor

* org.apache.iotdb.db.query.executor.LastQueryExecutor

Last查询对每个指定的时间序列执行`calculateLastPairForOneSeries`方法。

## 读取MNode缓存数据

我们在需要查询的时间序列所对应的MNode结构中添加Last数据缓存。`calculateLastPairForOneSeries`方法对于某个时间序列的Last查询,首先尝试读取MNode中的缓存数据。
```
try {
node = MManager.getInstance().getDeviceNodeWithAutoCreateStorageGroup(seriesPath.toString());
} catch (MetadataException e) {
throw new QueryProcessException(e);
}
if (((LeafMNode) node).getCachedLast() != null) {
return ((LeafMNode) node).getCachedLast();
}
```
如果发现缓存没有被写入过,则执行下面的标准查询流程读取TsFile数据。

## Last标准查询流程

Last标准查询流程需要遍历所有的顺序文件和乱序文件得到查询结果,最后将查询结果写回到MNode缓存。算法中对顺序文件和乱序文件分别进行处理。
- 顺序文件由于是对其写入时间已经排好序,因此直接使用`loadChunkMetadataFromTsFileResource`方法取出最后一个`ChunkMetadata`,通过`ChunkMetadata`的统计数据得到最大时间戳和对应的值。
```
if (!seqFileResources.isEmpty()) {
List<ChunkMetaData> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(
seqFileResources.get(seqFileResources.size() - 1), seriesPath, context);
if (!chunkMetadata.isEmpty()) {
ChunkMetaData lastChunkMetaData = chunkMetadata.get(chunkMetadata.size() - 1);
Statistics chunkStatistics = lastChunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
}
}
```
- 乱序文件则需要遍历所有的`ChunkMetadata`结构得到最大时间戳数据。需要注意的是当多个`ChunkMetadata`拥有相同的时间戳时,我们取`version`值最大的`ChunkMatadata`中的数据作为Last的结果。

```
long version = 0;
for (TsFileResource resource : unseqFileResources) {
if (resource.getEndTimeMap().get(seriesPath.getDevice()) < resultPair.getTimestamp()) {
break;
}
List<ChunkMetaData> chunkMetadata =
FileLoaderUtils.loadChunkMetadataFromTsFileResource(resource, seriesPath, context);
for (ChunkMetaData chunkMetaData : chunkMetadata) {
if (chunkMetaData.getEndTime() == resultPair.getTimestamp()
&& chunkMetaData.getVersion() > version) {
Statistics chunkStatistics = chunkMetaData.getStatistics();
resultPair =
constructLastPair(
chunkStatistics.getEndTime(), chunkStatistics.getLastValue(), tsDataType);
version = chunkMetaData.getVersion();
}
}
}
```
- 最后将查询结果写入到MNode的Last缓存
```
((LeafMNode) node).updateCachedLast(resultPair, false, Long.MIN_VALUE);
```

## Last 缓存更新策略

Last缓存更新的逻辑位于`LeafMNode``updateCachedLast`方法内,这里引入两个额外的参数`highPriorityUpdate``latestFlushTime``highPriorityUpdate`用来表示本次更新是否是高优先级的,新数据写入而导致的缓存更新都被认为是高优先级更新,而查询时更新缓存默认为低优先级更新。`latestFlushTime`用来记录当前已被写回到磁盘的数据的最大时间戳。

缓存更新的策略如下:

1. 当缓存中没有记录时,对于查询到的Last数据,将查询的结果直接写入到缓存中。
2. 当缓存中没有记录时,对于写入的最新数据如果时间戳大于或等于`latestFlushTime`,则将写入的数据写入到缓存中。
3. 当缓存中已有记录时,根据查询或写入的数据时间戳与当前缓存中时间戳作对比。写入的数据具有高优先级,时间戳不小于缓存记录则更新缓存;查询出的数据低优先级,必须大于缓存记录的时间戳才更新缓存。

具体代码如下
```
public synchronized void updateCachedLast(
TimeValuePair timeValuePair, boolean highPriorityUpdate, Long latestFlushedTime) {
if (timeValuePair == null || timeValuePair.getValue() == null) return;
if (cachedLastValuePair == null) {
// If no cached last, (1) a last query (2) an unseq insertion or (3) a seq insertion will update cache.
if (!highPriorityUpdate || latestFlushedTime <= timeValuePair.getTimestamp()) {
cachedLastValuePair =
new TimeValuePair(timeValuePair.getTimestamp(), timeValuePair.getValue());
}
} else if (timeValuePair.getTimestamp() > cachedLastValuePair.getTimestamp()
|| (timeValuePair.getTimestamp() == cachedLastValuePair.getTimestamp()
&& highPriorityUpdate)) {
cachedLastValuePair.setTimestamp(timeValuePair.getTimestamp());
cachedLastValuePair.setValue(timeValuePair.getValue());
}
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,39 @@ GROUP BY的SELECT子句里的查询路径必须是聚合函数,否则系统将

<center><img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/16079446/69116099-0b715300-0ac6-11ea-8074-84e04797b8c7.png"></center>

### 最近时间戳数据查询

对应的SQL语句是:

```
select last <Path> [COMMA <Path>]* from < PrefixPath > [COMMA < PrefixPath >]* <DISABLE ALIGN>
```
其含义是:

查询时间序列prefixPath.path中最近时间戳的数据

下面的例子中查询时间序列root.ln.wf01.wt01.status最近时间戳的数据:
```
select last status from root.ln.wf01.wt01 disable align
```
结果集为以下的形式返回:
```
| Time | Path | Value |
| --- | ----------------------- | ----- |
| 5 | root.ln.wf01.wt01.status| 100 |
```

假设root.ln.wf01.wt01中包含多列数据,如id, status, temperature,下面的例子将会把这几列数据在最近时间戳的记录同时返回:
```
select last id, status, temperature from root.ln.wf01 disable align
| Time | Path | Value |
| --- | ---------------------------- | ----- |
| 5 | root.ln.wf01.wt01.id | 10 |
| 7 | root.ln.wf01.wt01.status | true |
| 9 | root.ln.wf01.wt01.temperature| 35.7 |
```


## 数据维护

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,23 @@ It costs 0.417s
* 设置存储组

``` SQL
SET STORAGE GROUP TO <PrefixPath>
SET STORAGE GROUP TO <FullPath>
Eg: IoTDB > SET STORAGE GROUP TO root.ln.wf01.wt01
Note: PrefixPath can not include `*`
Note: FullPath can not include `*`
```
* 删除存储组

```
DELETE STORAGE GROUP <PrefixPath> [COMMA <PrefixPath>]*
DELETE STORAGE GROUP <FullPath> [COMMA <FullPath>]*
Eg: IoTDB > DELETE STORAGE GROUP root.ln.wf01.wt01
Eg: IoTDB > DELETE STORAGE GROUP root.ln.wf01.wt01, root.ln.wf01.wt02
Note: PrefixPath can not include `*`
Note: FullPath can not include `*`
```

* 创建时间序列语句

```
CREATE TIMESERIES <Timeseries> WITH <AttributeClauses>
CREATE TIMESERIES <FullPath> WITH <AttributeClauses>
AttributeClauses : DATATYPE=<DataTypeValue> COMMA ENCODING=<EncodingValue> [COMMA <ExtraAttributeClause>]*
DataTypeValue: BOOLEAN | DOUBLE | FLOAT | INT32 | INT64 | TEXT
EncodingValue: GORILLA | PLAIN | RLE | TS_2DIFF | REGULAR
Expand Down Expand Up @@ -470,6 +470,40 @@ root.sg1.d0.s0 is INT32 while root.sg2.d3.s0 is FLOAT.
```

* Last语句

Last 语句返回所要查询时间序列的最近时间戳的一条数据

```
SELECT LAST <SelectClause> FROM <FromClause> <DisableAlignClause>
Select Clause : <Path> [COMMA <Path>]*
FromClause : < PrefixPath > [COMMA < PrefixPath >]*
DisableAlignClause : [DISABLE ALIGN]
Eg. SELECT LAST s1 FROM root.sg.d1 disable align
Eg. SELECT LAST s1, s2 FROM root.sg.d1 disable align
Eg. SELECT LAST s1 FROM root.sg.d1, root.sg.d2 disable align
规则:
1. 需要满足PrefixPath.Path 为一条完整的时间序列,即 <PrefixPath> + <Path> = <Timeseries>
2. SELECT LAST 语句不支持过滤条件.
3. 结果集以"disable align"的形式返回,表现为总是包含三列的表格。
例如 "select last s1, s2 from root.sg.d1, root.sg.d2 disable align", 结果集返回如下:
| Time | Path | Value |
| --- | ------------ | ----- |
| 5 | root.sg.d1.s1| 100 |
| 2 | root.sg.d1.s2| 400 |
| 4 | root.sg.d2.s1| 250 |
| 9 | root.sg.d2.s2| 600 |
4. SELECT LAST 查询语句要是总是和末尾的disable align在一起使用。如果用户不熟悉SELECT LAST的语法或者忘记在末尾添加"disable align",IoTDB 也会接受不包含"disable align"的SQL语句并且仍以"disable align"的形式返回结果集。
例如用户输入 "select last s1 from root.sg.d1" 所得到的查询结果与 "select last s1 from root.sg.d1 disable align". 的结果是完全相同的。
```

### 数据库管理语句

* 创建用户
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Msg: org.apache.iotdb.exception.MetadataErrorException: org.apache.iotdb.excepti
```

### Show Storage Group

After the storage group is created, we can use the [SHOW STORAGE GROUP](/#/Documents/progress/chap5/sec4) statement to view all the storage groups. The SQL statement is as follows:

```
Expand All @@ -51,6 +52,7 @@ The result is as follows:


### Create Timeseries

According to the storage model selected before, we can create corresponding timeseries in the two storage groups respectively. The SQL statements for creating timeseries are as follows:

```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,41 @@ The path after SELECT in GROUP BY statement must be aggregate function, otherwis

<center><img style="width:100%; max-width:800px; max-height:600px; margin-left:auto; margin-right:auto; display:block;" src="https://user-images.githubusercontent.com/16079446/69116099-0b715300-0ac6-11ea-8074-84e04797b8c7.png"></center>

### Last timestamp Query
In scenarios when IoT devices updates data in a fast manner, users are more interested in the most recent record of IoT devices.
The LAST query is to return the most recent value of the given timeseries in a time-value pair format.

The SQL statement is:

```
select last <Path> [COMMA <Path>]* from < PrefixPath > [COMMA < PrefixPath >]* <DISABLE ALIGN>
```
which means:

Query and return the data with the largest timestamp of timeseries prefixPath.path.

In the following example, we queries the latest record of timeseries root.ln.wf01.wt01.status:
```
select last status from root.ln.wf01.wt01 disable align
```
The result will be returned in a three column table format.
```
| Time | Path | Value |
| --- | ----------------------- | ----- |
| 5 | root.ln.wf01.wt01.status| 100 |
```
If the path root.ln.wf01.wt01 has multiple columns, for example id, status and temperature, the following case will return records of all the three measurements with the largest timestamp.
```
select last id, status, temperature from root.ln.wf01.wt01 disable align
| Time | Path | Value |
| --- | ---------------------------- | ----- |
| 5 | root.ln.wf01.wt01.id | 10 |
| 7 | root.ln.wf01.wt01.status | true |
| 9 | root.ln.wf01.wt01.temperature| 35.7 |
```


### Automated Fill

In the actual use of IoTDB, when doing the query operation of timeseries, situations where the value is null at some time points may appear, which will obstruct the further analysis by users. In order to better reflect the degree of data change, users expect missing values to be automatically filled. Therefore, the IoTDB system introduces the function of Automated Fill.
Expand Down

0 comments on commit b3f035b

Please sign in to comment.