New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[IOTDB-386] Vectorize the raw data query process #652
Conversation
* next to next batch in SeriesReaderWithoutValueFilter
* move fill data to executeQueryStatement
* add fill buffer in EngineDataSetWithoutValueFilter
* Annote some codes to avoid complication errors
remove hasNext() in BatchData
|
||
// add value buffer of current series | ||
ByteBuffer valueBuffer = ByteBuffer.allocate(valueBAOSList[seriesIndex].size()); | ||
valueBuffer.put(valueBAOSList[seriesIndex].toByteArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use getBuf() instead of toByteArray()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
// add bitmap buffer of current series | ||
ByteBuffer bitmapBuffer = ByteBuffer.allocate(bitmapBAOSList[seriesIndex].size()); | ||
bitmapBuffer.put(bitmapBAOSList[seriesIndex].toByteArray()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use getBuf() instead of toByteArray()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
synchronized long genTaskId() { | ||
taskId++; | ||
return taskId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the use of this class is only to generate a global unique taskId for external sort job, why don't we use a AtomicLong variable, and there is no need to use synchronized, it is too heavy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -30,6 +30,7 @@ | |||
import org.apache.iotdb.tsfile.read.common.Path; | |||
import org.apache.iotdb.tsfile.read.filter.TimeFilter; | |||
import org.apache.iotdb.tsfile.read.filter.basic.Filter; | |||
import org.apache.iotdb.tsfile.read.reader.IBatchReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete this import, it isn't used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
|
||
public class CachedDiskChunkReader implements IPointReader { | ||
|
||
private ChunkReader chunkReader; | ||
private AbstractChunkReader AbstractChunkReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first letter of the variable should be lowercase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
public IBatchReader getIBatchReader() throws IOException { | ||
if (type.equals(ChunkReaderType.DISK_CHUNK)) { | ||
Chunk chunk = chunkLoader.getChunk(chunkMetaData); | ||
AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, filter); | ||
return new DiskChunkReader(AbstractChunkReader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems this method is not used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
|
||
private ChunkReader chunkReader; | ||
private AbstractChunkReader AbstractChunkReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first letter of variable should be lowercase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@Override | ||
public boolean hasNextBatch() throws IOException { | ||
return false; | ||
} | ||
|
||
@Override | ||
public BatchData nextBatch() throws IOException { | ||
return null; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fake implementation is for future use? Why should the DiskChunkReader implement the IBatchReader?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could remove this fake implementation currently. I plan to remove TimeValuePair in anywhere in the future.
@@ -35,8 +35,8 @@ public CachedUnseqResourceMergeReader(List<Chunk> chunks, TSDataType dataType) | |||
super(dataType); | |||
int priorityValue = 1; | |||
for (Chunk chunk : chunks) { | |||
ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk); | |||
addReaderWithPriority(new CachedDiskChunkReader(chunkReader), priorityValue++); | |||
AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first letter of variable should be lowercase
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent job! Now the structure of querying is more clear. And without constructing useless row record object, the performance does improve.
chunkMetaDataList = chunkMetaDataList.stream() | ||
.sorted(Comparator.comparing(ChunkMetaData::getStartTime)).collect(Collectors.toList()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to use stream().sorted, you can directly use chunkMetaDataList.sort(Comparator<? extends T>)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
public BatchData nextBatch() throws IOException { | ||
return batchData; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I keep calling nextBatch() without calling hasNextBatch(), I always get the same batchData which is obviously counterintuitive. At least, you should add a annotation to illustrate the relationship between nextBatch() and hasNextBatch().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, Lei Rui also plans to standard the semantic of next and hasNext in another PR. Therefore, I just fix this class and leave others.
public int encodeInt(int value, long time); | ||
|
||
public long encodeLong(long value, long time); | ||
|
||
public float encodeFloat(float value, long time); | ||
|
||
public double encodeDouble(double value, long time); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to add public
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
@@ -101,10 +101,10 @@ public static long collectFileSizes(List<TsFileResource> seqFiles, List<TsFileRe | |||
} | |||
|
|||
public static int writeChunkWithoutUnseq(Chunk chunk, IChunkWriter chunkWriter) throws IOException { | |||
ChunkReader chunkReader = new ChunkReaderWithoutFilter(chunk); | |||
AbstractChunkReader AbstractChunkReader = new ChunkReader(chunk, null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
this(host, port, Config.DEFAULT_USER, Config.DEFAULT_PASSWORD, 10000); | ||
} | ||
|
||
public Session(String host, String port, String username, String password) { | ||
this(host, Integer.parseInt(port), username, password); | ||
this(host, Integer.parseInt(port), username, password, 10000); | ||
} | ||
|
||
public Session(String host, int port, String username, String password) { | ||
this(host, port, username, password, 10000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All these magic number '10000' should use a constant variable to replace
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move to Config in Session module
|
||
protected IChunkLoader chunkLoader; | ||
protected List<ChunkMetaData> chunkMetaDataList; | ||
protected AbstractChunkReader AbstractChunkReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lowercase it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
@@ -40,7 +40,7 @@ | |||
protected List<ChunkMetaData> chunkMetaDataList; | |||
private int currentChunkIndex = 0; | |||
|
|||
private ChunkReader chunkReader; | |||
private AbstractChunkReader AbstractChunkReader; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lowercase it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBPreparedInsertionStatement.java
Show resolved
Hide resolved
server/src/test/java/org/apache/iotdb/db/engine/merge/MergeOverLapTest.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/iotdb/db/query/reader/universal/PriorityMergeReaderTest2.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/apache/iotdb/db/writelog/recover/UnseqTsFileRecoverTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok for me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, I think everything is ok for me and I also test the raw data query process including both sequence data and unsequence data with some scripts. I really look forward to the following steps of query process optimization and codes refactor. 🎉
@@ -24,6 +24,7 @@ | |||
import java.sql.ResultSetMetaData; | |||
import java.sql.SQLException; | |||
import java.sql.Statement; | |||
import org.apache.iotdb.jdbc.IoTDBStatement; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This import is unused. Maybe you can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* judge whether the specified column value is null in the current position | ||
* | ||
* @param index column index | ||
* @return | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This java doc is not consistent with the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated
* <p> | ||
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which | ||
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>. | ||
* <p> | ||
* This class is used in {@link org.apache.iotdb.db.query.reader.resourceRelated.UnseqResourceMergeReader}. | ||
* This class is used in {@link NewUnseqResourceMergeReader}. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This class is actually used in ChunkReaderWrap
(This may be deleted as other files too)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleted
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.stream.Collectors; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This unused import could be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
* Create a ChunkReader with priority for each ChunkMetadata and put the ChunkReader to | ||
* mergeReader one by one | ||
*/ | ||
@Override public boolean hasNextBatch() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be in two lines
@Override public boolean hasNextBatch() throws IOException { | |
@Override | |
public boolean hasNextBatch() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
||
public void skipPageData() { | ||
chunkReader.skipPageData(); | ||
return filter.satisfy(chunkMetaData.getStatistics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Method chunkSatisfied
could changed to:
return filter.satisfy(chunkMetaData.getStatistics()); | |
return filter == null || filter.satisfy(chunkMetaData.getStatistics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
private Filter timeFilter; | ||
private int index = 0; // used to index current metadata in metaDataList | ||
|
||
private static final int DEFAULT_BATCH_DATA_SIZE = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be moved into the config file so that user could customized config it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is a parameter called aggregate_fetch_size, which can be used
// cache batch data for unsequence reader | ||
private BatchData unseqBatchData; | ||
|
||
private static final int DEFAULT_BATCH_DATA_SIZE = 10000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed as above
Refactor the file reader and chunk reader organization in TsFile.
Implement IBatchReader in SeriesReaderWithoutValueFilter
Add a NewUnseqResourceMergeReader that only loads chunks when needed, not loading all
chunks at first. @Zesong Sun
Return TSQueryDataSet in the first query request. @dawei Liu
Avoid constructing RowRecord in NewEngineDataSetWIthoutValueFilter. @yuan Tian
Move the limit&slimit from client to the server @LEI Rui
The class name start with Old is for Aggregation and Group By. I leave them for future work.
The performance evaluation:
In my mac, one device, ten sensors, long datatype, RLE & SNAPPY. Each time series contains 1 billion data points. Total 562M data on disk.
I query the raw data of ten time series with a time filter that results in 10 million points in total.
master
f_batch_reader