Skip to content
Permalink
Browse files
DRILL-6147: Adding Columnar Parquet Batch Sizing functionality
closes #1330
  • Loading branch information
sachouche authored and ilooner committed Jun 30, 2018
1 parent 140d09e commit f481a7c2833b8c7ebabe02a37590cdd3e559ca5e
Showing 44 changed files with 3,525 additions and 429 deletions.
@@ -316,6 +316,13 @@ private ExecConstants() {
public static final String PARQUET_FLAT_READER_BULK = "store.parquet.flat.reader.bulk";
public static final OptionValidator PARQUET_FLAT_READER_BULK_VALIDATOR = new BooleanValidator(PARQUET_FLAT_READER_BULK);

// Controls the flat parquet reader batching constraints (number of record and memory limit)
public static final String PARQUET_FLAT_BATCH_NUM_RECORDS = "store.parquet.flat.batch.num_records";
public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR = new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, Integer.MAX_VALUE);
public static final String PARQUET_FLAT_BATCH_MEMORY_SIZE = "store.parquet.flat.batch.memory_size";
// This configuration is used to overwrite the common memory batch sizing configuration property
public static final OptionValidator PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR = new RangeLongValidator(PARQUET_FLAT_BATCH_MEMORY_SIZE, 0, Integer.MAX_VALUE);

public static final String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
public static final BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = new BooleanValidator(JSON_ALL_TEXT_MODE);
public static final BooleanValidator JSON_EXTENDED_TYPES = new BooleanValidator("store.json.extended_types");
@@ -689,4 +696,13 @@ public static String bootDefaultFor(String name) {

public static final String ALLOW_LOOPBACK_ADDRESS_BINDING = "drill.exec.allow_loopback_address_binding";

/** Enables batch size statistics logging */
public static final String STATS_LOGGING_BATCH_SIZE_OPTION = "drill.exec.stats.logging.batch_size";
public static final BooleanValidator STATS_LOGGING_BATCH_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_BATCH_SIZE_OPTION);

/** Enables fine-grained batch size statistics logging */
public static final String STATS_LOGGING_FG_BATCH_SIZE_OPTION = "drill.exec.stats.logging.fine_grained.batch_size";
public static final BooleanValidator STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR = new BooleanValidator(STATS_LOGGING_FG_BATCH_SIZE_OPTION);


}
@@ -47,6 +47,8 @@
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.util.CallBack;
import org.apache.drill.exec.util.record.RecordBatchStats;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.NullableVarCharVector;
import org.apache.drill.exec.vector.SchemaChangeCallBack;
@@ -80,6 +82,8 @@ public class ScanBatch implements CloseableRecordBatch {
private final BufferAllocator allocator;
private final List<Map<String, String>> implicitColumnList;
private String currentReaderClassName;
private final RecordBatchStatsContext batchStatsLogging;

/**
*
* @param context
@@ -117,6 +121,7 @@ public ScanBatch(FragmentContext context,
this.implicitColumnList = implicitColumnList;
addImplicitVectors();
currentReader = null;
batchStatsLogging = new RecordBatchStatsContext(context, oContext);
} finally {
oContext.getStats().stopProcessing();
}
@@ -174,6 +179,7 @@ public IterOutcome next() {
boolean isNewSchema = mutator.isNewSchema();
populateImplicitVectorsAndSetCount();
oContext.getStats().batchReceived(0, recordCount, isNewSchema);
logRecordBatchStats();

if (recordCount == 0) {
currentReader.close();
@@ -291,6 +297,45 @@ public VectorWrapper<?> getValueAccessorById(Class<?> clazz, int... ids) {
return container.getValueAccessorById(clazz, ids);
}

private void logRecordBatchStats() {
final int MAX_FQN_LENGTH = 50;

if (recordCount == 0) {
return; // NOOP
}

RecordBatchStats.logRecordBatchStats(
batchStatsLogging.getContextOperatorId(),
getFQNForLogging(MAX_FQN_LENGTH),
this,
batchStatsLogging,
logger);
}

/** Might truncate the FQN if too long */
private String getFQNForLogging(int maxLength) {
final String FQNKey = "FQN";
final ValueVector v = mutator.implicitFieldVectorMap.get(FQNKey);

final Object fqnObj;

if (v == null
|| v.getAccessor().getValueCount() == 0
|| (fqnObj = ((NullableVarCharVector) v).getAccessor().getObject(0)) == null) {

return "NA";
}

String fqn = fqnObj.toString();

if (fqn != null && fqn.length() > maxLength) {
fqn = fqn.substring(fqn.length() - maxLength, fqn.length());
}

return fqn;
}


/**
* Row set mutator implementation provided to record readers created by
* this scan batch. Made visible so that tests can create this mutator
@@ -149,6 +149,8 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
new OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
@@ -229,6 +231,8 @@ public static CaseInsensitiveMap<OptionDefinition> createDefaultOptionDefinition
new OptionDefinition(ExecConstants.ENABLE_VECTOR_VALIDATOR),
new OptionDefinition(ExecConstants.ENABLE_ITERATOR_VALIDATOR),
new OptionDefinition(ExecConstants.OUTPUT_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, false)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_SIZE_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.STATS_LOGGING_BATCH_FG_SIZE_VALIDATOR,new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.FRAG_RUNNER_RPC_TIMEOUT_VALIDATOR, new OptionMetaData(OptionValue.AccessibleScopes.SYSTEM, true, true)),
};

@@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;

@@ -38,14 +38,14 @@ public BatchReader(ReadState readState) {

public int readBatch() throws Exception {
ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
long recordsToRead = Math.min(getReadCount(firstColumnStatus), readState.getRecordsToRead());
int currBatchNumRecords = readState.batchSizerMgr().getCurrentRecordsPerBatch();
long recordsToRead = Math.min(currBatchNumRecords, readState.getRemainingValuesToRead());
int readCount = readRecords(firstColumnStatus, recordsToRead);

readState.fillNullVectors(readCount);
return readCount;
}

protected abstract long getReadCount(ColumnReader<?> firstColumnStatus);

protected abstract int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception;

protected void readAllFixedFields(long recordsToRead) throws Exception {
@@ -59,14 +59,14 @@ protected void readAllFixedFields(long recordsToRead) throws Exception {
}

protected void readAllFixedFieldsSerial(long recordsToRead) throws IOException {
for (ColumnReader<?> crs : readState.getColumnReaders()) {
for (ColumnReader<?> crs : readState.getFixedLenColumnReaders()) {
crs.processPages(recordsToRead);
}
}

protected void readAllFixedFieldsParallel(long recordsToRead) throws Exception {
ArrayList<Future<Long>> futures = Lists.newArrayList();
for (ColumnReader<?> crs : readState.getColumnReaders()) {
for (ColumnReader<?> crs : readState.getFixedLenColumnReaders()) {
Future<Long> f = crs.processPagesAsync(recordsToRead);
if (f != null) {
futures.add(f);
@@ -105,15 +105,6 @@ public MockBatchReader(ReadState readState) {
super(readState);
}

@Override
protected long getReadCount(ColumnReader<?> firstColumnStatus) {
if (readState.recordsRead() == readState.schema().getGroupRecordCount()) {
return 0;
}
return Math.min(ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH,
readState.schema().getGroupRecordCount() - readState.recordsRead());
}

@Override
protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) {
readState.updateCounts((int) recordsToRead);
@@ -132,16 +123,16 @@ public FixedWidthReader(ReadState readState) {
super(readState);
}

@Override
protected long getReadCount(ColumnReader<?> firstColumnStatus) {
return Math.min(readState.schema().getRecordsPerBatch(),
firstColumnStatus.columnChunkMetaData.getValueCount() - firstColumnStatus.totalValuesRead);
}

@Override
protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
readAllFixedFields(recordsToRead);
return firstColumnStatus.getRecordsReadInCurrentPass();

Preconditions.checkNotNull(firstColumnStatus != null);
readState.setValuesReadInCurrentPass(firstColumnStatus.getRecordsReadInCurrentPass()); // get the number of rows read

readState.updateCounts((int) recordsToRead); // update the shared Reader State

return readState.getValuesReadInCurrentPass();
}
}

@@ -156,16 +147,22 @@ public VariableWidthReader(ReadState readState) {
super(readState);
}

@Override
protected long getReadCount(ColumnReader<?> firstColumnStatus) {
return ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
}

@Override
protected int readRecords(ColumnReader<?> firstColumnStatus, long recordsToRead) throws Exception {
// We should not rely on the "firstColumnStatus.getRecordsReadInCurrentPass()" when dealing
// with variable length columns as each might return a different number of records. The batch size
// will be the lowest value. The variable column readers will update the "readState" object to
// reflect the correct information.
long fixedRecordsToRead = readState.varLengthReader().readFields(recordsToRead);
readAllFixedFields(fixedRecordsToRead);
return firstColumnStatus.getRecordsReadInCurrentPass();

// Sanity check the fixed readers read the expected number of rows
Preconditions.checkArgument(firstColumnStatus == null
|| firstColumnStatus.getRecordsReadInCurrentPass() == readState.getValuesReadInCurrentPass());

readState.updateCounts((int) fixedRecordsToRead);

return readState.getValuesReadInCurrentPass();
}
}
}
@@ -25,9 +25,9 @@

final class BitReader extends ColumnReader<BitVector> {

BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
BitReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
boolean fixedLength, BitVector v, SchemaElement schemaElement) throws ExecutionSetupException {
super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
super(parentReader, descriptor, columnChunkMetaData, fixedLength, v, schemaElement);
}

@Override
@@ -83,7 +83,7 @@ ColumnDescriptor getColumnDescriptor() {

volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()

protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor,
protected ColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
this.parentReader = parentReader;
this.columnDescriptor = descriptor;

0 comments on commit f481a7c

Please sign in to comment.