[IOTDB-418] add some impl of NewSeriesReaderWithoutValueFilter#692
[IOTDB-418] add some impl of NewSeriesReaderWithoutValueFilter#692qiaojialin merged 31 commits intoapache:new_series_readerfrom liutaohua:new_series_reader
Conversation
| Path seriesPath = queryDataSource.getSeriesPath(); | ||
| TreeSet<TsFileResource> unseqTsFilesSet = new TreeSet<>((o1, o2) -> { | ||
| String queryMeasurement = seriesPath.getMeasurement(); | ||
| List<Long> o1StartTimeList = o1.getChunkMetaDataList().stream() |
There was a problem hiding this comment.
This ChunkMetadataList should all belongs to this measurement. No need to filter again. Besides, only un closed TsFile has the chunkMetaDataList.
The sort could compare by the startTimeMap.get(seriesPath.getDeviceId())
…or-iotdb into new_series_reader � Conflicts: � server/src/main/java/org/apache/iotdb/db/query/executor/EngineQueryRouter.java � server/src/main/java/org/apache/iotdb/db/query/reader/chunkRelated/DiskChunkReader.java � tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/chunk/AbstractChunkReader.java
…or-iotdb into new_series_reader � Conflicts: � server/src/main/java/org/apache/iotdb/db/query/dataset/NewEngineDataSetWithoutValueFilter.java
|
please try to merge master to check whether the failed test can pass. |
please help me merge master to IoTDB new_series_reader |
samperson1997
left a comment
There was a problem hiding this comment.
Hi, thanks a lot for your effort and codes. You'd really done an excellent and fantastic job in this PR! I believe it will be quite great contribution after being merged.
After an initial quick view of changed files, I have found some tiny little problems. Most of them are just suggestions and you can decide whether to fix them or not. Afterwards, I will try to deeply review the logistic of all the codes.
Look forward to your further updating and contribution!
.../java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesDataReaderWithoutValueFilter.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/iotdb/db/query/reader/seriesRelated/RawDataReaderWithoutValueFilter.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/IAggregateReader.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/IRawReader.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/iotdb/db/query/reader/seriesRelated/SeriesDataReaderWithoutValueFilter.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/query/reader/seriesRelated/AbstractDataReader.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/query/aggregation/AggregateFunction.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/query/aggregation/impl/FirstValueAggrFunc.java
Outdated
Show resolved
Hide resolved
server/src/main/java/org/apache/iotdb/db/query/dataset/DeviceIterateDataSet.java
Show resolved
Hide resolved
| Statistics statsByType = Statistics.getStatsByType(dataType); | ||
| ChunkMetaData metaData = new ChunkMetaData(measurementUid, dataType, 0, statsByType); | ||
| if (!isEmpty()) { | ||
| List<TimeValuePair> sortedTimeValuePairList = getSortedTimeValuePairList(); |
There was a problem hiding this comment.
could TimeValuePairSorter be replaced by TVList?
| public abstract AggreResultData getResult(); | ||
|
|
||
|
|
||
| public abstract void calculateValueFromStatistics(Statistics chunkStatistics) |
There was a problem hiding this comment.
I suggest extracting AggreResultData out from AggregateFunction, each time calculateValueFromStatistics returns a AggreResultData, and merge outside.
|
|
||
| public abstract void calculateValueFromPageData(BatchData dataInThisPage) throws IOException; | ||
|
|
||
| public abstract void calculateValueFromPageData(BatchData dataInThisPage,long bound) throws IOException; |
There was a problem hiding this comment.
add javadoc for bound
remove unused methods in this class
| } | ||
|
|
||
| @Override | ||
| public void calculateValueFromStatistics(Statistics chunkStatistics) |
There was a problem hiding this comment.
| public void calculateValueFromStatistics(Statistics chunkStatistics) | |
| public void calculateValueFromStatistics(Statistics statistics) |
| groupByPlan.setUnit(unit); | ||
| groupByPlan.setDeduplicatedPaths(executePaths); | ||
| groupByPlan.setDeduplicatedDataTypes(dataTypes); | ||
| groupByPlan.setDeduplicatedDataTypes(tsDataTypes); |
There was a problem hiding this comment.
Yeah, looks like a bug from before
| timeHeap = new TreeSet<>(); | ||
| for (int i = 0; i < seriesReaderWithoutValueFilterList.size(); i++) { | ||
| ManagedSeriesReader reader = seriesReaderWithoutValueFilterList.get(i); | ||
| RawDataReaderWithoutValueFilter reader = seriesReaderWithoutValueFilterList.get(i); |
There was a problem hiding this comment.
The ManagedSeriesReader is for distribution, we'd better retain it.
There was a problem hiding this comment.
done, but only batch was kept
| // /** | ||
| // * constructor. | ||
| // */ | ||
| // public AggregateEngineExecutor(AggregationPlan aggregationPlan) { |
There was a problem hiding this comment.
could this class be removed?
There was a problem hiding this comment.
done, and I've removed all the unused classes and tests
| if (!batchData.hasCurrent()) { | ||
| batchData = allDataReader.nextBatch(); | ||
| } | ||
| afterPair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType()); |
There was a problem hiding this comment.
could we assure each batch from RawDataReader is not empty?
| if (!batchData.hasCurrent()) { | ||
| batchData = allDataReader.nextBatch(); | ||
| } | ||
| cachedPair = new TimeValuePair(batchData.currentTime(), batchData.currentTsPrimitiveType()); |
There was a problem hiding this comment.
The same question with LinearFill
| @@ -36,10 +37,10 @@ | |||
| */ | |||
| public class DiskChunkReader implements IPointReader, IBatchReader { | |||
There was a problem hiding this comment.
DiskChunkReader may could be replaced by ChunkReader implement IPointReader.
Maybe ChunkReaderWrap could be replaced by IChunkReader
| * Aggregate results cannot be calculated using Statistics directly, using the data in each page | ||
| * | ||
| * @param dataInThisPage the data in Page | ||
| * @param bound the time upper bounder of data in unsequence data reader |
There was a problem hiding this comment.
calculate points whose time < bound
| * <p> | ||
| */ | ||
| public class DiskChunkReader implements IPointReader, IBatchReader { | ||
| public class ChunkReaderIterator implements IPointReader, IBatchReader { |
There was a problem hiding this comment.
This name is confusing... it seems could iterate chunk reader
There was a problem hiding this comment.
rename to ChunkDataIterator
...rc/main/java/org/apache/iotdb/db/query/dataset/groupby/GroupByWithoutValueFilterDataSet.java
Outdated
Show resolved
Hide resolved
| while (newSeriesReader.hasNextChunk()) { | ||
| if (newSeriesReader.canUseChunkStatistics()) { | ||
| Statistics chunkStatistics = newSeriesReader.currentChunkStatistics(); | ||
| function.calculateValueFromStatistics(chunkStatistics); | ||
| if (function.isCalculatedAggregationResult()) { | ||
| return function.getResult(); | ||
| } | ||
| newSeriesReader.skipChunkData(); | ||
| continue; | ||
| } | ||
| while (newSeriesReader.hasNextPage()) { | ||
| //cal by pageheader | ||
| if (newSeriesReader.canUsePageStatistics()) { | ||
| Statistics pageStatistic = newSeriesReader.currentChunkStatistics(); | ||
| function.calculateValueFromStatistics(pageStatistic); | ||
| if (function.isCalculatedAggregationResult()) { | ||
| return function.getResult(); | ||
| } | ||
| newSeriesReader.skipPageData(); | ||
| continue; | ||
| } | ||
| //cal by pagedata | ||
| while (newSeriesReader.hasNextBatch()) { | ||
| function.calculateValueFromPageData(newSeriesReader.nextBatch()); | ||
| if (function.isCalculatedAggregationResult()) { | ||
| return function.getResult(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| return function.getResult(); |
There was a problem hiding this comment.
extract these codes to avoid repeated codes in this file
| this.context = context; | ||
| this.dataType = dataType; | ||
|
|
||
| if (filter != null) { |
There was a problem hiding this comment.
what if filter == null and has a TTL?
…or-iotdb into new_series_reader
| } else if (seqChunkMetadatas.isEmpty() && !unseqChunkMetadatas.isEmpty()) { | ||
| chunkMetaData = unseqChunkMetadatas.pollFirst(); | ||
| } else if (!seqChunkMetadatas.isEmpty()) { | ||
| // seq 和 unseq 的 chunk metadata 都不为空 |
There was a problem hiding this comment.
| // seq 和 unseq 的 chunk metadata 都不为空 | |
| // neither seqChunkMetadatas nor unseqChunkMetadatas is null |
| * unseq file is a very special file that intersects not only with an ordered file, but also with | ||
| * another unseq file. So we need a way to find all the files that might be used to intersect the | ||
| * current measurement point. |
There was a problem hiding this comment.
unseq files are very special files that intersect not only with sequence files, but also with
other unseq files. So we need to find all tsfiles that overlapped with current chunk and
extract chunks from the resource
| private void fillOverlappedFiles() throws IOException { | ||
| while (!unseqFileResource.isEmpty()) { | ||
| Map<String, Long> startTimeMap = unseqFileResource.first().getStartTimeMap(); | ||
| Long unSeqStartTime = startTimeMap.getOrDefault(seriesPath.getDevice(), Long.MIN_VALUE); |
There was a problem hiding this comment.
| Long unSeqStartTime = startTimeMap.getOrDefault(seriesPath.getDevice(), Long.MIN_VALUE); | |
| Long unSeqStartTime = startTimeMap.getOrDefault(seriesPath.getDevice(), Long.MAX_VALUE); |
| while (!unseqFileResource.isEmpty()) { | ||
| Map<String, Long> startTimeMap = unseqFileResource.first().getStartTimeMap(); | ||
| Long unSeqStartTime = startTimeMap.getOrDefault(seriesPath.getDevice(), Long.MIN_VALUE); | ||
| if (chunkMetaData.getEndTime() > unSeqStartTime) { |
There was a problem hiding this comment.
| if (chunkMetaData.getEndTime() > unSeqStartTime) { | |
| if (chunkMetaData.getEndTime() >= unSeqStartTime) { |
|
|
||
| //for test | ||
| public AbstractDataReader(Path seriesPath, TSDataType dataType, | ||
| Filter filter, QueryContext context, List<TsFileResource> resources) throws IOException { |
There was a problem hiding this comment.
| Filter filter, QueryContext context, List<TsFileResource> resources) throws IOException { | |
| Filter filter, QueryContext context, List<TsFileResource> seqResources) throws IOException { |
| * another unseq file. So we need a way to find all the files that might be used to intersect the | ||
| * current measurement point. | ||
| */ | ||
| private void fillOverlappedFiles() throws IOException { |
There was a problem hiding this comment.
| private void fillOverlappedFiles() throws IOException { | |
| private void unpackOverlappedFiles() throws IOException { |
| while (!unseqChunkMetadatas.isEmpty()) { | ||
| long startTime = unseqChunkMetadatas.first().getStartTime(); | ||
|
|
||
| if (chunkMetaData.getEndTime() > startTime) { |
There was a problem hiding this comment.
| if (chunkMetaData.getEndTime() > startTime) { | |
| if (chunkMetaData.getEndTime() >= startTime) { |
| while (!seqChunkMetadatas.isEmpty()) { | ||
| long startTime = seqChunkMetadatas.get(0).getStartTime(); | ||
|
|
||
| if (chunkMetaData.getEndTime() > startTime) { |
There was a problem hiding this comment.
| if (chunkMetaData.getEndTime() > startTime) { | |
| if (chunkMetaData.getEndTime() >= startTime) { |
| } else { | ||
| Chunk chunk = chunkLoader.getChunk(metaData); | ||
| chunkReader = new ChunkReader(chunk, filter); | ||
| chunkReader.hasNextSatisfiedPage(); |
There was a problem hiding this comment.
This line is strange, it's better to remove it.
The process of processing read data is a very complicated one, and previous read interfaces were obscure and confusing.
I reimplemented a reader based on which all the data in the files could be queried in order
I have mainly done the following work:
Renamed
iAggregateReaderinTsFileproject toIPageSkipRaderAdded
IChunkReader. Based on this interface, there are two implementations,ChunkReaderandMemChunkReaderAdded
ChunkMetadataforReadonlyMemChunkandPageHeaderforMemChunkReaderAdded
IAggregateReaderandIRawReaderinterface andAbstractDataReaderabstract class to complete all functions of the new reader, respectively realizingRawDataReaderWithoutValueFilter,SeriesDataReaderWithoutValueFilterandSeriesDataReaderWithValueFilterRefactoring
group by,raw queryandaggregate query,and so onIn the previous group by implementation, the open and closed interval definition was wrong, [0-100] I changed it to [0-100)