Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[IOTDB-527] Refactor series reader #864

Merged
merged 32 commits into from Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
9aa6176
reorganize series reader and aggregate reader
qiaojialin Feb 29, 2020
d229e98
fix point reader
qiaojialin Feb 29, 2020
18068a9
fix bytimestamp
qiaojialin Feb 29, 2020
1dadf07
combine nextPage (#861)
liutaohua Mar 1, 2020
fe759e8
remove set chunkmetadata=null in hasNextChunk
qiaojialin Mar 1, 2020
17365a7
remove hasOverlappedPage
qiaojialin Mar 1, 2020
34476e5
new groupby (#862)
liutaohua Mar 1, 2020
986f758
fix hasNextPage in SeriesReader
qiaojialin Mar 1, 2020
b7c10da
remove unused interfaces in IAggregateReader
qiaojialin Mar 1, 2020
3b33fbb
push down value filter and add log
qiaojialin Mar 2, 2020
e51e07e
fix nextPage non-stop
qiaojialin Mar 2, 2020
0114481
make hasNextPage and nextPage more robust
qiaojialin Mar 2, 2020
d9a19dc
add stack
qiaojialin Mar 2, 2020
6ad42e9
add mergeReaer check in isPageOverlapped
qiaojialin Mar 2, 2020
b725c72
fix hasNextOverlappedPage when no data in this page under value filter
qiaojialin Mar 2, 2020
1c39133
Merge remote-tracking branch 'origin/master' into optimize_series_reader
qiaojialin Mar 2, 2020
9e85181
add log
qiaojialin Mar 2, 2020
f06043b
add filter overlapped data test and add log
qiaojialin Mar 2, 2020
3e6a61f
add log for point 100492
qiaojialin Mar 2, 2020
46a133b
add log in hasNextOverlappedPage
qiaojialin Mar 2, 2020
6ba2969
Optimize_series_reader_fix (#867)
liutaohua Mar 2, 2020
80456bd
add log
qiaojialin Mar 2, 2020
23d620a
Merge remote-tracking branch 'origin/optimize_series_reader' into opt…
qiaojialin Mar 2, 2020
3954860
fix tmp bug
qiaojialin Mar 2, 2020
2d6e8fc
remove debug log
qiaojialin Mar 2, 2020
60c0f5e
add test and fix review
qiaojialin Mar 2, 2020
ad50583
fix code smell
qiaojialin Mar 2, 2020
8f5cf8b
Update server/src/test/java/org/apache/iotdb/db/integration/IoTDBClos…
JackieTien97 Mar 2, 2020
e455a96
fix review
qiaojialin Mar 2, 2020
2843e8c
Merge remote-tracking branch 'origin/optimize_series_reader' into opt…
qiaojialin Mar 2, 2020
7a28519
fix import
qiaojialin Mar 2, 2020
9101bcf
fix review
qiaojialin Mar 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -212,7 +212,7 @@ while (aggregateReader.hasNextChunk()) {

当前第一个chunk meta data的引用

* private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
* private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
new PriorityQueue<>(
Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));

Expand Down Expand Up @@ -272,35 +272,35 @@ while (aggregateReader.hasNextChunk()) {

这个方法判断是否有下一个Page,一般在`firstChunkMetaData`不可直接使用时,继续解成Page。

首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`overlappedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false`,`firstChunkMetaData`置为`null`。若`overlappedPageReaders`为空则返回`false`,若不为空,返回`true`。
首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`cachedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false`,`firstChunkMetaData`置为`null`。若`cachedPageReaders`为空则返回`false`,若不为空,返回`true`。

#### isPageOverlapped()

这个方法判断当前的Page有没有其他与之重叠的Page存在。

如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`。
如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`。

#### nextPage()

须与`isPageOverlapped()`方法搭配使用。

当`overlappedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`overlappedPageReaders`的第一个Page里符合过滤条件的所有data。
当`cachedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`cachedPageReaders`的第一个Page里符合过滤条件的所有data。

#### currentPageStatistics()

返回`overlappedPageReaders`里第一个Page的统计信息。
返回`cachedPageReaders`里第一个Page的统计信息。

#### skipCurrentPage()

跳过当前Page。只需要将`overlappedPageReaders`里第一个PageReader删掉即可。
跳过当前Page。只需要将`cachedPageReaders`里第一个PageReader删掉即可。

#### hasNextOverlappedPage()

这个方法判断当前还有没有重叠的Page。

如果`hasCachedNextBatch`为`true`,直接返回`true`。

否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`overlappedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`cachedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。

然后先从`mergeReader`里取出当前最大的结束时间,作为此次所要返回的batch的结束时间,记为`currentPageEndTime`。接着去遍历`mergeReader`,直到当前的时间戳大于`currentPageEndTime`。

Expand Down
Expand Up @@ -74,11 +74,12 @@ public abstract void updateResultFromStatistics(Statistics statistics)
* @param dataInThisPage the data in Page
*/
public abstract void updateResultFromPageData(BatchData dataInThisPage) throws IOException;

/**
* Aggregate results cannot be calculated using Statistics directly, using the data in each page
*
* @param dataInThisPage the data in Page
* @param bound calculate points whose time < bound
* @param bound calculate points whose time < bound
*/
public abstract void updateResultFromPageData(BatchData dataInThisPage, long bound)
throws IOException;
Expand Down Expand Up @@ -170,6 +171,12 @@ public void serializeTo(OutputStream outputStream) throws IOException {

public void reset() {
hasResult = false;
booleanValue = false;
doubleValue = 0;
floatValue = 0;
intValue = 0;
longValue = 0;
binaryValue = null;
}

protected Object getValue() {
Expand Down
Expand Up @@ -143,4 +143,10 @@ public long getCnt() {
return cnt;
}

@Override
public void reset() {
super.reset();
cnt = 0;
avg = 0;
}
}