Skip to content

Commit

Permalink
DRILL-4800: Add AsyncPageReader to pipeline PageRead Use non tracking…
Browse files Browse the repository at this point in the history
… input stream for Parquet scans. Make choice between async and sync reader configurable. Make various options user configurable - choose between sync and async page reader, enable/disable fadvise Add Parquet Scan metrics to track time spent in various operations
  • Loading branch information
parthchandra committed Nov 3, 2016
1 parent fe2334e commit f9a443d
Show file tree
Hide file tree
Showing 19 changed files with 684 additions and 173 deletions.
4 changes: 4 additions & 0 deletions distribution/src/resources/drill-override-example.conf
Expand Up @@ -166,6 +166,10 @@ drill.exec: {
initial: 20000000
}
},
scan: {
threadpool_size: 8,
decode_threadpool_size: 1
},
debug.error_on_leak: true
}

Expand Down
Expand Up @@ -96,6 +96,10 @@ public interface ExecConstants {
/** Size of JDBC batch queue (in batches) above which throttling begins. */
String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD =
"drill.jdbc.batch_queue_throttling_threshold";
// Thread pool size for scan threads. Used by the Parquet scan.
String SCAN_THREADPOOL_SIZE = "drill.exec.scan.threadpool_size";
// The size of the thread pool used by a scan to decode the data. Used by Parquet
String SCAN_DECODE_THREADPOOL_SIZE = "drill.exec.scan.decode_threadpool_size";

/**
* Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or
Expand Down Expand Up @@ -147,10 +151,21 @@ public interface ExecConstants {
String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp";
OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP, false);

String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async";
OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true);

// Use a buffering reader for parquet page reader
String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread";
OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true);

// Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 8 MiB
String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize";
OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 4*1024*1024);

// try to use fadvise if available
String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise";
OptionValidator PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_FADVISE, false);

OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement", false);

String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
Expand Down
Expand Up @@ -47,10 +47,16 @@ public abstract class OperatorContext {

public abstract ExecutorService getExecutor();

public abstract ExecutorService getScanExecutor();

public abstract ExecutorService getScanDecodeExecutor();

public abstract ExecutionControls getExecutionControls();

public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;

public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException;

/**
* Run the callable as the given proxy user.
*
Expand Down
Expand Up @@ -50,6 +50,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
private final BufferManager manager;
private DrillFileSystem fs;
private final ExecutorService executor;
private final ExecutorService scanExecutor;
private final ExecutorService scanDecodeExecutor;

/**
* This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There
Expand All @@ -70,6 +72,8 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context)
stats = context.getStats().newOperatorStats(def, allocator);
executionControls = context.getExecutionControls();
executor = context.getDrillbitContext().getExecutor();
scanExecutor = context.getDrillbitContext().getScanExecutor();
scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
}

public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats)
Expand All @@ -81,6 +85,8 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context,
this.stats = stats;
executionControls = context.getExecutionControls();
executor = context.getDrillbitContext().getExecutor();
scanExecutor = context.getDrillbitContext().getScanExecutor();
scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor();
}

public DrillBuf replace(DrillBuf old, int newSize) {
Expand All @@ -95,10 +101,16 @@ public DrillBuf getManagedBuffer(int size) {
return manager.getManagedBuffer(size);
}

// Allow and operator to use the thread pool
// Allow an operator to use the thread pool
public ExecutorService getExecutor() {
return executor;
}
public ExecutorService getScanExecutor() {
return scanExecutor;
}
public ExecutorService getScanDecodeExecutor() {
return scanDecodeExecutor;
}

public ExecutionControls getExecutionControls() {
return executionControls;
Expand Down Expand Up @@ -179,4 +191,11 @@ public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
return fs;
}

@Override
public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
fs = new DrillFileSystem(conf, null);
return fs;
}

}
Expand Up @@ -26,6 +26,7 @@
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;

/**
Expand All @@ -47,6 +48,7 @@ public class OperatorMetricRegistry {
register(CoreOperatorType.HASH_AGGREGATE_VALUE, HashAggTemplate.Metric.class);
register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class);
register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class);
register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class);
}

private static void register(final int operatorType, final Class<? extends MetricDef> metricDef) {
Expand Down
Expand Up @@ -131,55 +131,55 @@ public OperatorStats mergeMetrics(OperatorStats from) {
/**
* Clear stats
*/
public void clear() {
public synchronized void clear() {
processingNanos = 0l;
setupNanos = 0l;
waitNanos = 0l;
longMetrics.clear();
doubleMetrics.clear();
}

public void startSetup() {
public synchronized void startSetup() {
assert !inSetup : assertionError("starting setup");
stopProcessing();
inSetup = true;
setupMark = System.nanoTime();
}

public void stopSetup() {
public synchronized void stopSetup() {
assert inSetup : assertionError("stopping setup");
startProcessing();
setupNanos += System.nanoTime() - setupMark;
inSetup = false;
}

public void startProcessing() {
public synchronized void startProcessing() {
assert !inProcessing : assertionError("starting processing");
processingMark = System.nanoTime();
inProcessing = true;
}

public void stopProcessing() {
public synchronized void stopProcessing() {
assert inProcessing : assertionError("stopping processing");
processingNanos += System.nanoTime() - processingMark;
inProcessing = false;
}

public void startWait() {
public synchronized void startWait() {
assert !inWait : assertionError("starting waiting");
stopProcessing();
inWait = true;
waitMark = System.nanoTime();
}

public void stopWait() {
public synchronized void stopWait() {
assert inWait : assertionError("stopping waiting");
startProcessing();
waitNanos += System.nanoTime() - waitMark;
inWait = false;
}

public void batchReceived(int inputIndex, long records, boolean newSchema) {
public synchronized void batchReceived(int inputIndex, long records, boolean newSchema) {
recordsReceivedByInput[inputIndex] += records;
batchesReceivedByInput[inputIndex]++;
if(newSchema){
Expand Down
Expand Up @@ -20,11 +20,11 @@
import com.codahale.metrics.MetricRegistry;
import io.netty.channel.EventLoopGroup;

import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import java.util.concurrent.SynchronousQueue;
import org.apache.drill.common.DrillAutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.scanner.persistence.ScanResult;
Expand All @@ -37,6 +37,7 @@

public class BootStrapContext implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class);
private static final int MIN_SCAN_THREADPOOL_SIZE = 8; // Magic num

private final DrillConfig config;
private final EventLoopGroup loop;
Expand All @@ -45,12 +46,15 @@ public class BootStrapContext implements AutoCloseable {
private final BufferAllocator allocator;
private final ScanResult classpathScan;
private final ExecutorService executor;
private final ExecutorService scanExecutor;
private final ExecutorService scanDecodeExecutor;

public BootStrapContext(DrillConfig config, ScanResult classpathScan) {
this.config = config;
this.classpathScan = classpathScan;
this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-");
this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-");
this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS),
"BitClient-");
// Note that metrics are stored in a static instance
this.metrics = DrillMetrics.getRegistry();
this.allocator = RootAllocatorFactory.newRoot(config);
Expand All @@ -65,12 +69,35 @@ protected void afterExecute(final Runnable r, final Throwable t) {
super.afterExecute(r, t);
}
};
// Setup two threadpools one for reading raw data from disk and another for decoding the data
// A good guideline is to have the number threads in the scan pool to be a multiple (fractional
// numbers are ok) of the number of disks.
// A good guideline is to have the number threads in the decode pool to be a small multiple (fractional
// numbers are ok) of the number of cores.
final int numCores = Runtime.getRuntime().availableProcessors();
final int numScanThreads = (int) (config.getDouble(ExecConstants.SCAN_THREADPOOL_SIZE));
final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE);
final int scanThreadPoolSize =
MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads;
final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores : numScanDecodeThreads;

this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-"));
this.scanDecodeExecutor =
Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-"));
}

public ExecutorService getExecutor() {
return executor;
}

public ExecutorService getScanExecutor() {
return scanExecutor;
}

public ExecutorService getScanDecodeExecutor() {
return scanDecodeExecutor;
}

public DrillConfig getConfig() {
return config;
}
Expand Down
Expand Up @@ -172,6 +172,12 @@ public CodeCompiler getCompiler() {
public ExecutorService getExecutor() {
return context.getExecutor();
}
public ExecutorService getScanExecutor() {
return context.getScanExecutor();
}
public ExecutorService getScanDecodeExecutor() {
return context.getScanDecodeExecutor();
}

public LogicalPlanPersistence getLpPersistence() {
return lpPersistence;
Expand Down
Expand Up @@ -99,8 +99,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea
ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR,
ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR,
ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR,
ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR,
ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR,
ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR,
ExecConstants.ENABLE_UNION_TYPE,
Expand Down
Expand Up @@ -17,28 +17,30 @@
*/
package org.apache.drill.exec.store.parquet;

import java.util.concurrent.atomic.AtomicLong;

public class ParquetReaderStats {

public long numDictPageHeaders;
public long numPageHeaders;
public long numDictPageLoads;
public long numPageLoads;
public long numDictPagesDecompressed;
public long numPagesDecompressed;

public long totalDictPageHeaderBytes;
public long totalPageHeaderBytes;
public long totalDictPageReadBytes;
public long totalPageReadBytes;
public long totalDictDecompressedBytes;
public long totalDecompressedBytes;

public long timeDictPageHeaders;
public long timePageHeaders;
public long timeDictPageLoads;
public long timePageLoads;
public long timeDictPagesDecompressed;
public long timePagesDecompressed;
public AtomicLong numDictPageLoads = new AtomicLong();
public AtomicLong numDataPageLoads = new AtomicLong();
public AtomicLong numDataPagesDecoded = new AtomicLong();
public AtomicLong numDictPagesDecompressed = new AtomicLong();
public AtomicLong numDataPagesDecompressed = new AtomicLong();

public AtomicLong totalDictPageReadBytes = new AtomicLong();
public AtomicLong totalDataPageReadBytes = new AtomicLong();
public AtomicLong totalDictDecompressedBytes = new AtomicLong();
public AtomicLong totalDataDecompressedBytes = new AtomicLong();

public AtomicLong timeDictPageLoads = new AtomicLong();
public AtomicLong timeDataPageLoads = new AtomicLong();
public AtomicLong timeDataPageDecode = new AtomicLong();
public AtomicLong timeDictPageDecode = new AtomicLong();
public AtomicLong timeDictPagesDecompressed = new AtomicLong();
public AtomicLong timeDataPagesDecompressed = new AtomicLong();

public AtomicLong timeDiskScanWait = new AtomicLong();
public AtomicLong timeDiskScan = new AtomicLong();

public ParquetReaderStats() {
}
Expand Down
Expand Up @@ -73,10 +73,18 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS

DrillFileSystem fs;
try {
fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
} catch(IOException e) {
throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
boolean useAsyncPageReader =
context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
if (useAsyncPageReader) {
fs = oContext.newNonTrackingFileSystem(rowGroupScan.getStorageEngine().getFsConf());
} else {
fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
}
} catch (IOException e) {
throw new ExecutionSetupException(
String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
}

Configuration conf = new Configuration(fs.getConf());
conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
Expand Down

0 comments on commit f9a443d

Please sign in to comment.