Skip to content

Commit

Permalink
fix Zesong Sun's review
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaojialin committed Dec 26, 2019
1 parent 5144cfd commit 837aaa5
Show file tree
Hide file tree
Showing 11 changed files with 22 additions and 29 deletions.
Expand Up @@ -24,7 +24,6 @@
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.iotdb.jdbc.IoTDBStatement;

public class JDBCExample {

Expand Down
Expand Up @@ -765,7 +765,8 @@ private void constructOneRow() {
/**
* judge whether the specified column value is null in the current position
*
* @param index column index
* @param index series index
* @param rowNum current position
*/
private boolean isNull(int index, int rowNum) {
byte bitmap = currentBitmap[index];
Expand Down
2 changes: 1 addition & 1 deletion server/src/assembly/resources/conf/iotdb-engine.properties
Expand Up @@ -176,7 +176,7 @@ kerberos_principal=your principal
write_read_free_memory_proportion=6:3:1

# The amount of data read each time in batch (the number of data strips, that is, the number of different timestamps.)
aggregate_fetch_size=100000
batch_size=100000

# Size of log buffer in each log node(in byte).
# If WAL is enabled and the size of a insert plan is smaller than this parameter, then the insert plan will be rejected by WAL
Expand Down
12 changes: 6 additions & 6 deletions server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
Expand Up @@ -152,9 +152,9 @@ public class IoTDBConfig {
private int maxMemtableNumber = 20;

/**
* The amount of data that is read every time when IoTDB merges data.
* The amount of data that is read every time.
*/
private int aggregateFetchSize = 100000;
private int batchSize = 100000;

/**
* How many threads can concurrently flush. When <= 0, use CPU core number.
Expand Down Expand Up @@ -648,12 +648,12 @@ void setMultiDirStrategyClassName(String multiDirStrategyClassName) {
this.multiDirStrategyClassName = multiDirStrategyClassName;
}

public int getAggregateFetchSize() {
return aggregateFetchSize;
public int getBatchSize() {
return batchSize;
}

void setAggregateFetchSize(int aggregateFetchSize) {
this.aggregateFetchSize = aggregateFetchSize;
void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}

public int getMaxMemtableNumber() {
Expand Down
Expand Up @@ -160,8 +160,8 @@ private void loadProps() {
conf.setMultiDirStrategyClassName(properties.getProperty("multi_dir_strategy",
conf.getMultiDirStrategyClassName()));

conf.setAggregateFetchSize(Integer.parseInt(properties.getProperty("aggregate_fetch_size",
Integer.toString(conf.getAggregateFetchSize()))));
conf.setBatchSize(Integer.parseInt(properties.getProperty("batch_size",
Integer.toString(conf.getBatchSize()))));

long tsfileSizeThreshold = Long.parseLong(properties
.getProperty("tsfile_size_threshold",
Expand Down
Expand Up @@ -61,7 +61,7 @@ public GroupByWithValueFilterDataSet(long queryId, List<Path> paths, long unit,
long slidingStep, long startTime, long endTime) {
super(queryId, paths, unit, slidingStep, startTime, endTime);
this.allDataReaderList = new ArrayList<>();
this.timeStampFetchSize = IoTDBDescriptor.getInstance().getConfig().getAggregateFetchSize();
this.timeStampFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}

/**
Expand Down
Expand Up @@ -72,7 +72,7 @@ public AggregateEngineExecutor(List<Path> selectedSeries, List<String> aggres,
this.selectedSeries = selectedSeries;
this.aggres = aggres;
this.expression = expression;
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getAggregateFetchSize();
this.aggregateFetchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();
}

/**
Expand Down
Expand Up @@ -34,7 +34,6 @@
* Note that <code>ChunkReader</code> is an abstract class with three concrete classes, two of which
* are used here: <code>ChunkReaderWithoutFilter</code> and <code>ChunkReaderWithFilter</code>.
* <p>
* This class is used in {@link NewUnseqResourceMergeReader}.
*/
public class DiskChunkReader implements IPointReader, IBatchReader {

Expand Down
Expand Up @@ -23,7 +23,7 @@
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.cache.DeviceMetaDataCache;
import org.apache.iotdb.db.engine.modification.Modification;
import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
Expand Down Expand Up @@ -55,7 +55,7 @@ public class NewUnseqResourceMergeReader implements IBatchReader {
private Filter timeFilter;
private int index = 0; // used to index current metadata in metaDataList

private static final int DEFAULT_BATCH_DATA_SIZE = 10000;
private int batchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();

private BatchData batchData;
private TSDataType dataType;
Expand Down Expand Up @@ -137,14 +137,15 @@ public NewUnseqResourceMergeReader(Path seriesPath, TSDataType dataType,
* Create a ChunkReader with priority for each ChunkMetadata and put the ChunkReader to
* mergeReader one by one
*/
@Override public boolean hasNextBatch() throws IOException {
@Override
public boolean hasNextBatch() throws IOException {
if (hasCachedBatch) {
return true;
}

batchData = new BatchData(dataType, true);

for (int rowCount = 0; rowCount < DEFAULT_BATCH_DATA_SIZE; rowCount++) {
for (int rowCount = 0; rowCount < batchSize; rowCount++) {
if (priorityMergeReader.hasNext()) {

// current time of priority merge reader >= next chunks start time
Expand Down
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iotdb.db.query.reader.seriesRelated;

import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
import org.apache.iotdb.db.exception.StorageEngineException;
import org.apache.iotdb.db.query.context.QueryContext;
Expand Down Expand Up @@ -49,9 +50,7 @@ public class SeriesReaderWithoutValueFilter implements IBatchReader, IPointReade
// cache batch data for unsequence reader
private BatchData unseqBatchData;

private static final int DEFAULT_BATCH_DATA_SIZE = 10000;

private int batchDataSize;
private int batchSize = IoTDBDescriptor.getInstance().getConfig().getBatchSize();

/**
* will be removed after removing IPointReader
Expand Down Expand Up @@ -87,7 +86,6 @@ public SeriesReaderWithoutValueFilter(Path seriesPath, TSDataType dataType, Filt
this.unseqResourceMergeReader = new NewUnseqResourceMergeReader(seriesPath, dataType,
queryDataSource.getUnseqResources(), context, null);
}
this.batchDataSize = DEFAULT_BATCH_DATA_SIZE;
}

/**
Expand All @@ -97,7 +95,6 @@ public SeriesReaderWithoutValueFilter(Path seriesPath, TSDataType dataType, Filt
IBatchReader unseqResourceMergeReader) {
this.seqResourceIterateReader = seqResourceIterateReader;
this.unseqResourceMergeReader = unseqResourceMergeReader;
this.batchDataSize = DEFAULT_BATCH_DATA_SIZE;
}


Expand Down Expand Up @@ -137,7 +134,7 @@ public BatchData nextBatch() throws IOException {
// if the count reaches batch data size
int count = 0;
BatchData batchData = new BatchData(seqBatchData.getDataType(), true);
while (count < batchDataSize && hasNextInSeq() && hasNextInUnSeq()) {
while (count < batchSize && hasNextInSeq() && hasNextInUnSeq()) {
long timeInSeq = seqBatchData.currentTime();
long timeInUnseq = unseqBatchData.currentTime();
Object currentValue;
Expand Down
Expand Up @@ -37,7 +37,6 @@ public FileSeriesReader(IChunkLoader chunkLoader,
super(chunkLoader, chunkMetaDataList, filter);
}


@Override
protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {
Chunk chunk = chunkLoader.getChunk(chunkMetaData);
Expand All @@ -46,10 +45,7 @@ protected void initChunkReader(ChunkMetaData chunkMetaData) throws IOException {

@Override
protected boolean chunkSatisfied(ChunkMetaData chunkMetaData) {
if (filter == null ) {
return true;
}
return filter.satisfy(chunkMetaData.getStatistics());
return filter == null || filter.satisfy(chunkMetaData.getStatistics());
}

}

0 comments on commit 837aaa5

Please sign in to comment.