Skip to content

Commit

Permalink
Merge 848394a into c00b63f
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Qiao committed Mar 3, 2020
2 parents c00b63f + 848394a commit b61432f
Show file tree
Hide file tree
Showing 9 changed files with 352 additions and 397 deletions.
410 changes: 190 additions & 220 deletions docs/Documentation-CHN/SystemDesign/5-DataQuery/2-SeriesReader.md

Large diffs are not rendered by default.

Expand Up @@ -63,15 +63,11 @@ while (aggregateReader.hasNextChunk()) {
aggregateReader.skipCurrentPage();
continue;
}
// 遍历所有重叠的page
while (aggregateReader.hasNextOverlappedPage()) {
BatchData batchData = aggregateReader.nextOverlappedPage();
// do some aggregate calculation using batch data
} else {
BatchData batchData = aggregateReader.nextPage();
// do some aggregate calculation using batch data
...
}
}
}
}
```
Expand Down
121 changes: 57 additions & 64 deletions docs/Documentation-CHN/SystemDesign/5-DataQuery/5-GroupByQuery.md
Expand Up @@ -62,87 +62,80 @@ protected boolean hasNextWithoutConstraint() {

不带值过滤条件的降采样查询逻辑主要在 `GroupByWithoutValueFilterDataSet` 类中,该类继承了 `GroupByEngineDataSet`


该类有如下关键字段:
* private Map<Path, List<Integer>> pathToAggrIndexesMap 类似聚合查询中的 `pathToAggrIndexesMap`,其中每一个 entry 都是一个 series 的查询
* private Map<Path, IAggregateReader> aggregateReaders 查询的序列与其 reader 的对应映射
* private List<BatchData> cachedBatchDataList 记录缓存的 batchData 列表
* private Filter timeFilter
* private GroupByPlan groupByPlan
* private Map<Path, GroupByExecutor> pathExecutors 针对于相同 `Path` 的聚合函数进行归类,并封装成 `GroupByExecutor` ,
`GroupByExecutor` 封装了每个 `Path` 的数据计算逻辑和方法,在后面介绍

* private TimeRange timeRange 将每次计算的时间区间封装成对象,用于判断 `Statistics` 是否可以直接参与计算
* private Filter timeFilter 将用户定义的查询区间生成为 `Filter` 对象,用来过滤可用的`文件``chunk``page`

首先,在初始化 `initGroupBy()` 方法中,根据表达式计算出 `timeFilter`并通过 `path` 计算出 `pathToAggrIndexesMap``aggregateReaders``aggregateReaders` 列表中的每一个 `SeriesAggregateReader` 用于读取一个时间序列
首先,在初始化 `initGroupBy()` 方法中,根据表达式计算出 `timeFilter`并为每个 `path` 生成 `GroupByExecutor`

`nextWithoutConstraint()` 方法通过调用 `nextIntervalAggregation()` 方法计算出其聚合值 `aggregateResults`。在将结果集转化为RowRecord之前,需要将其顺序还原为用户查询的顺序。以下方法用于将结果列表转化为 RowRecord,需要注意列表中没有结果时, RowRecord 中添加类型为 `null``Field`
`nextWithoutConstraint()` 方法通过调用 `GroupByExecutor.calcResult()` 方法计算出每个 `Path` 内的所有聚合方法的聚合值 `aggregateResults`
以下方法用于将结果列表转化为 RowRecord,需要注意列表中没有结果时, RowRecord 中添加 `null`
```
if (aggregateResultList.length == 0) {
record.addField(new Field(null));
} else {
for (AggregateResult res : aggregateResultList) {
record.addField(res.getResult(), res.getDataType());
for (AggregateResult res : fields) {
if (res == null) {
record.addField(null);
continue;
}
record.addField(res.getResult(), res.getResultDataType());
}
```

下面详细讲解 `nextIntervalAggregation()` 方法。类似5.4节中聚合查询的流程中 `groupAggregationsBySeries()` 方法,对于每一个 entry(即series),首先为其每一种聚合查询创建一个聚合结果 `AggregateResult`,同时维护一个布尔值列表 `isCalculatedList`,对应每一个 `AggregateResult`是否已经计算完成,并记录需要剩余计算的聚合函数数目 `remainingToCalculate`。布尔值列表和这个计数值将会使得某些聚合函数(如 `FIRST_VALUE`)在获得结果后,不需要再继续进行整个循环过程。

根据 `aggregateReaders` 获得所查询的时间序列路径对应的 `IAggregateReader`,接下来,按照5.2节所介绍的 `aggregateReader` 使用方法,更新 `AggregateResult`
### GroupByExecutor
封装了相同 path 下的所有聚合函数的计算方法,该类有如下关键字段:
* private IAggregateReader reader 读取当前 `Path` 数据用到的 `SeriesAggregateReader`
* private BatchData preCachedData 每次从 `Reader` 读取的数据是一批,很有可能会超过当前的时间段,那么这个 `BatchData` 就会被缓存留给下一次使用
* private List<Pair<AggregateResult, Integer>> results 存储了当前 `Path` 里所有的聚合方法,
例如:`select count(a),sum(a),avg(b)``count``sum` 方法就被存到一起。
右侧的 `Integer` 用于结果集转化为RowRecord之前,需要将其顺序还原为用户查询的顺序。

#### 主要方法

```
while (aggregateReader.hasNextChunk()) {
if (aggregateReader.canUseCurrentChunkStatistics()) {
Statistics chunkStatistics = aggregateReader.currentChunkStatistics();
// do some aggregate calculation using chunk statistics
...
aggregateReader.skipCurrentChunk();
continue;
}
while (aggregateReader.hasNextPage()) {
if (aggregateReader.canUseCurrentPageStatistics()) {
Statistics pageStatistic = aggregateReader.currentPageStatistics();
// do some aggregate calculation using page statistics
...
aggregateReader.skipCurrentPage();
continue;
}
// 遍历所有重叠的page
while (aggregateReader.hasNextOverlappedPage()) {
BatchData batchData = aggregateReader.nextOverlappedPage();
// do some aggregate calculation using batch data
...
}
}
}
```
//添加当前 path 的聚合操作
private void addAggregateResult(AggregateResult aggrResult, int index);
//判断当前 path 是否已经完成了所有的聚合计算
private boolean isEndCalc();
`calcBatchData()` 方法用于根据 batchData 计算聚合结果。在 `batchData.hasCurrent() && batchData.currentTime() < curStartTime)` 的条件下,需要不断调用 `batchData.next()` 来得到符合要求的 batchData 段。更新 `AggregateResult` 后,由于获得每一个聚合函数结果都会遍历这个 batchData,因此需要调用 `resetBatchData()` 方法将指针指向其开始位置,使得下一个函数可以遍历。
//从上次计算没有使用完缓存的 BatchData 中计算结果
private boolean calcFromCacheData() throws IOException;
需要注意的是,在对于每一个result进行更新之前,需要首先判断其是否已经被计算完(利用 `isCalculatedList` 列表);每一次更新后,调用 `isCalculatedAggregationResult()` 方法同时更新列表中的布尔值。如果列表中所有值均为true,即 `remainingToCalculate` 值为0,证明所有聚合函数结果均已计算完,可以返回。
//使用 BatchData 计算
private void calcFromBatch(BatchData batchData) throws IOException;
//使用 Page 或 Chunk 的 Statistics 信息直接计算结果
private void calcFromStatistics(Statistics statistics) throws QueryProcessException;
//从 reader 中读取数据,并计算
private List<Pair<AggregateResult, Integer>> calcResult() throws IOException, QueryProcessException;
//清空所有计算结果
private void resetAggregateResults();
//遍历并计算 page 中的数据
private boolean readAndCalcFromPage() throws IOException, QueryProcessException;
```
`GroupByExecutor` 中因为相同 `path` 的不同聚合函数使用的数据是相同的,所以 `calcFromBatch` 方法就是完成遍历所有聚合函数对 `BatchData` 的计算:
```
if (Boolean.FALSE.equals(isCalculatedList.get(i))) {
AggregateResult aggregateResult = aggregateResultList.get(i);
... // 更新
if (aggregateResult.isCalculatedAggregationResult()) {
isCalculatedList.set(i, true);
remainingToCalculate--;
if (remainingToCalculate == 0) {
return aggregateResultList;
for (Pair<AggregateResult, Integer> result : results) {
//如果某个函数已经完成了计算,就不在进行计算了,比如最小值这种计算
if (result.left.isCalculatedAggregationResult()) {
continue;
}
}
// 执行计算
....
}
//判断当前的 batchdata 里的数据是否还能被下次使用,如果能则加到缓存中
if (batchData.getMaxTimestamp() >= curEndTime) {
preCachedData = batchData;
}
```
类似地,`isEndCal()` 方法用于判断 batchData 是不是已经计算完,如果已经计算完,也需要更新 `isCalculatedList` 列表和 `remainingToCalculate` 值:
```
private boolean isEndCalc(AggregateResult function, BatchData lastBatch) {
return (lastBatch != null && lastBatch.hasCurrent() && lastBatch.currentTime() >= curEndTime)
|| function.isCalculatedAggregationResult();
}
```

## 带值过滤条件的聚合查询
Expand Down
Expand Up @@ -143,6 +143,37 @@ private class GroupByExecutor {
this.preCachedData = null;
}

private List<Pair<AggregateResult, Integer>> calcResult()
throws IOException, QueryProcessException {
if (calcFromCacheData()) {
return results;
}

//read page data firstly
if (readAndCalcFromPage()) {
return results;
}

//read chunk finally
while (reader.hasNextChunk()) {
Statistics chunkStatistics = reader.currentChunkStatistics();
if (chunkStatistics.getStartTime() >= curEndTime) {
return results;
}
//calc from chunkMetaData
if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
calcFromStatistics(chunkStatistics);
reader.skipCurrentChunk();
continue;
}
if (readAndCalcFromPage()) {
return results;
}
}
return results;
}

private void addAggregateResult(AggregateResult aggrResult, int index) {
results.add(new Pair<>(aggrResult, index));
}
Expand Down Expand Up @@ -193,48 +224,17 @@ private void calcFromBatch(BatchData batchData) throws IOException {
}
}

private void calcFromStatistics(Statistics pageStatistics)
private void calcFromStatistics(Statistics statistics)
throws QueryProcessException {
for (Pair<AggregateResult, Integer> result : results) {
//cacl is compile
if (result.left.isCalculatedAggregationResult()) {
continue;
}
result.left.updateResultFromStatistics(pageStatistics);
result.left.updateResultFromStatistics(statistics);
}
}

private List<Pair<AggregateResult, Integer>> calcResult()
throws IOException, QueryProcessException {
if (calcFromCacheData()) {
return results;
}

//read page data firstly
if (readAndCalcFromPage()) {
return results;
}

//read chunk finally
while (reader.hasNextChunk()) {
Statistics chunkStatistics = reader.currentChunkStatistics();
if (chunkStatistics.getStartTime() >= curEndTime) {
return results;
}
//calc from chunkMetaData
if (reader.canUseCurrentChunkStatistics() && timeRange.contains(
new TimeRange(chunkStatistics.getStartTime(), chunkStatistics.getEndTime()))) {
calcFromStatistics(chunkStatistics);
reader.skipCurrentChunk();
continue;
}
if (readAndCalcFromPage()) {
return results;
}
}
return results;
}

// clear all results
private void resetAggregateResults() {
for (Pair<AggregateResult, Integer> result : results) {
Expand Down
Expand Up @@ -27,7 +27,7 @@ public interface IAggregateReader {

boolean hasNextChunk() throws IOException;

boolean canUseCurrentChunkStatistics();
boolean canUseCurrentChunkStatistics() throws IOException;

Statistics currentChunkStatistics();

Expand Down
Expand Up @@ -46,7 +46,7 @@ public boolean hasNextChunk() throws IOException {
}

@Override
public boolean canUseCurrentChunkStatistics() {
public boolean canUseCurrentChunkStatistics() throws IOException {
Statistics chunkStatistics = currentChunkStatistics();
return !seriesReader.isChunkOverlapped() && containedByTimeFilter(chunkStatistics);
}
Expand Down Expand Up @@ -77,7 +77,7 @@ public boolean canUseCurrentPageStatistics() throws IOException {
}

@Override
public Statistics currentPageStatistics() throws IOException {
public Statistics currentPageStatistics() {
return seriesReader.currentPageStatistics();
}

Expand Down
Expand Up @@ -24,6 +24,9 @@

import java.io.IOException;

/**
* only for test now
*/
public class SeriesRawDataPointReader implements IPointReader {

private final SeriesRawDataBatchReader batchReader;
Expand Down

0 comments on commit b61432f

Please sign in to comment.