Skip to content

Commit

Permalink
Merge ad50583 into 6f80172
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Mar 2, 2020
2 parents 6f80172 + ad50583 commit cae5471
Show file tree
Hide file tree
Showing 48 changed files with 977 additions and 705 deletions.
Expand Up @@ -212,7 +212,7 @@ while (aggregateReader.hasNextChunk()) {

当前第一个chunk meta data的引用

* private PriorityQueue<VersionPair<IPageReader>> overlappedPageReaders =
* private PriorityQueue<VersionPair<IPageReader>> cachedPageReaders =
new PriorityQueue<>(
Comparator.comparingLong(pageReader -> pageReader.data.getStatistics().getStartTime()));

Expand Down Expand Up @@ -272,35 +272,35 @@ while (aggregateReader.hasNextChunk()) {

这个方法判断是否有下一个Page,一般在`firstChunkMetaData`不可直接使用时,继续解成Page。

首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`overlappedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false``firstChunkMetaData`置为`null`。若`overlappedPageReaders`为空则返回`false`,若不为空,返回`true`
首先调用`fillOverlappedPageReaders()`去将`firstChunkMetaData`解开为`PageReader`,解开的`PageReader`都放进`cachedPageReaders`里。并将`hasCachedFirstChunkMetadata`置为`false``firstChunkMetaData`置为`null`。若`cachedPageReaders`为空则返回`false`,若不为空,返回`true`

#### isPageOverlapped()

这个方法判断当前的Page有没有其他与之重叠的Page存在。

如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`overlappedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`
如果`mergeReader`里仍然有数据,或者`seqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,或者`unseqChunkMetadatas`里有与`cachedPageReaders`里第一个`PageReader`时间重叠的,则返回`true`;反之,返回`false`

#### nextPage()

须与`isPageOverlapped()`方法搭配使用。

`overlappedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`overlappedPageReaders`的第一个Page里符合过滤条件的所有data。
`cachedPageReaders`里第一个Page没有与之重叠的其他Page时,直接获得`cachedPageReaders`的第一个Page里符合过滤条件的所有data。

#### currentPageStatistics()

返回`overlappedPageReaders`里第一个Page的统计信息。
返回`cachedPageReaders`里第一个Page的统计信息。

#### skipCurrentPage()

跳过当前Page。只需要将`overlappedPageReaders`里第一个PageReader删掉即可。
跳过当前Page。只需要将`cachedPageReaders`里第一个PageReader删掉即可。

#### hasNextOverlappedPage()

这个方法判断当前还有没有重叠的Page。

如果`hasCachedNextBatch``true`,直接返回`true`

否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`overlappedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。
否则,先调用`putAllDirectlyOverlappedPageReadersIntoMergeReader()`方法,将所有与`cachedPageReaders`第一个Page有重叠的PageReader放进`mergeReader`里。`mergeReader`里维护了一个`currentLargestEndTime`变量,每次add进新的Reader时被更新,用以记录当前添加进`mergeReader`的最大的结束时间。

然后先从`mergeReader`里取出当前最大的结束时间,作为此次所要返回的batch的结束时间,记为`currentPageEndTime`。接着去遍历`mergeReader`,直到当前的时间戳大于`currentPageEndTime`

Expand Down
Expand Up @@ -74,11 +74,12 @@ public abstract void updateResultFromStatistics(Statistics statistics)
* @param dataInThisPage the data in Page
*/
public abstract void updateResultFromPageData(BatchData dataInThisPage) throws IOException;

/**
* Aggregate results cannot be calculated using Statistics directly, using the data in each page
*
* @param dataInThisPage the data in Page
* @param bound calculate points whose time < bound
* @param bound calculate points whose time < bound
*/
public abstract void updateResultFromPageData(BatchData dataInThisPage, long bound)
throws IOException;
Expand Down Expand Up @@ -170,6 +171,12 @@ public void serializeTo(OutputStream outputStream) throws IOException {

public void reset() {
hasResult = false;
booleanValue = false;
doubleValue = 0;
floatValue = 0;
intValue = 0;
longValue = 0;
binaryValue = null;
}

protected Object getValue() {
Expand Down
Expand Up @@ -143,4 +143,10 @@ public long getCnt() {
return cnt;
}

@Override
public void reset() {
super.reset();
cnt = 0;
avg = 0;
}
}

0 comments on commit cae5471

Please sign in to comment.