Navigation Menu

Skip to content

Commit

Permalink
DRILL-5207: Improve Parquet Scan pipelining. Add a configurable Async…
Browse files Browse the repository at this point in the history
…PageReader Queue. Enforce total size of parquet row group. Do not initialize BufferedDirectBufInputStream buffer in init. Wait for first read. Change default size of BufferedDirectBufInputStream. Do not invoke getOptions too many times in Parquet reader. Add metrics for processing time, and decoding time for varlen and fixedlen columns.

This closes #723
  • Loading branch information
parthchandra committed Feb 4, 2017
1 parent 31b5282 commit 0520101
Show file tree
Hide file tree
Showing 12 changed files with 388 additions and 155 deletions.
Expand Up @@ -160,16 +160,23 @@ public interface ExecConstants {
String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async"; String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true); OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);


// Number of pages the Async Parquet page reader will read before blocking
String PARQUET_PAGEREADER_QUEUE_SIZE = "store.parquet.reader.pagereader.queuesize";
OptionValidator PARQUET_PAGEREADER_QUEUE_SIZE_VALIDATOR = new PositiveLongValidator(PARQUET_PAGEREADER_QUEUE_SIZE, Integer.MAX_VALUE, 2);

String PARQUET_PAGEREADER_ENFORCETOTALSIZE = "store.parquet.reader.pagereader.enforceTotalSize";
OptionValidator PARQUET_PAGEREADER_ENFORCETOTALSIZE_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ENFORCETOTALSIZE, false);

String PARQUET_COLUMNREADER_ASYNC = "store.parquet.reader.columnreader.async"; String PARQUET_COLUMNREADER_ASYNC = "store.parquet.reader.columnreader.async";
OptionValidator PARQUET_COLUMNREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_COLUMNREADER_ASYNC, false); OptionValidator PARQUET_COLUMNREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_COLUMNREADER_ASYNC, false);


// Use a buffering reader for parquet page reader // Use a buffering reader for parquet page reader
String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread"; String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true); OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true);


// Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 8 MiB // Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 1 MiB
String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize"; String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize";
OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 4*1024*1024); OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 1*1024*1024);


// try to use fadvise if available // try to use fadvise if available
String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise"; String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise";
Expand Down
Expand Up @@ -273,7 +273,7 @@ public FragmentHandle getHandle() {
return fragment.getHandle(); return fragment.getHandle();
} }


private String getFragIdString() { public String getFragIdString() {
final FragmentHandle handle = getHandle(); final FragmentHandle handle = getHandle();
final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0"; final String frag = handle != null ? handle.getMajorFragmentId() + ":" + handle.getMinorFragmentId() : "0:0";
return frag; return frag;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.Iterator; import java.util.Iterator;


import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.proto.UserBitShared.MetricValue; import org.apache.drill.exec.proto.UserBitShared.MetricValue;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
import org.apache.drill.exec.proto.UserBitShared.OperatorProfile.Builder; import org.apache.drill.exec.proto.UserBitShared.OperatorProfile.Builder;
Expand Down Expand Up @@ -187,6 +188,17 @@ public synchronized void batchReceived(int inputIndex, long records, boolean new
} }
} }


public String getId() {
StringBuilder s = new StringBuilder();
return s.append(this.operatorId)
.append(":")
.append("[")
.append(UserBitShared.CoreOperatorType.valueOf(operatorType))
.append("]")
.toString();
}


public OperatorProfile getProfile() { public OperatorProfile getProfile() {
final OperatorProfile.Builder b = OperatorProfile // final OperatorProfile.Builder b = OperatorProfile //
.newBuilder() // .newBuilder() //
Expand Down
Expand Up @@ -103,6 +103,8 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR, ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR, ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_ENFORCETOTALSIZE_VALIDATOR,
ExecConstants.PARQUET_COLUMNREADER_ASYNC_VALIDATOR, ExecConstants.PARQUET_COLUMNREADER_ASYNC_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR, ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR, ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,
Expand Down
Expand Up @@ -41,6 +41,9 @@ public class ParquetReaderStats {


public AtomicLong timeDiskScanWait = new AtomicLong(); public AtomicLong timeDiskScanWait = new AtomicLong();
public AtomicLong timeDiskScan = new AtomicLong(); public AtomicLong timeDiskScan = new AtomicLong();
public AtomicLong timeFixedColumnRead = new AtomicLong();
public AtomicLong timeVarColumnRead = new AtomicLong();
public AtomicLong timeProcess = new AtomicLong();


public ParquetReaderStats() { public ParquetReaderStats() {
} }
Expand Down

0 comments on commit 0520101

Please sign in to comment.