From f9a443d8a3d8e81b7e76f161b611003d16a53a4d Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 27 Sep 2016 14:03:35 -0700 Subject: [PATCH] DRILL-4800: Add AsyncPageReader to pipeline PageRead Use non tracking input stream for Parquet scans. Make choice between async and sync reader configurable. Make various options user configurable - choose between sync and async page reader, enable/disable fadvise Add Parquet Scan metrics to track time spent in various operations --- .../src/resources/drill-override-example.conf | 4 + .../org/apache/drill/exec/ExecConstants.java | 15 + .../drill/exec/ops/OperatorContext.java | 6 + .../drill/exec/ops/OperatorContextImpl.java | 21 +- .../exec/ops/OperatorMetricRegistry.java | 2 + .../apache/drill/exec/ops/OperatorStats.java | 16 +- .../drill/exec/server/BootStrapContext.java | 33 +- .../drill/exec/server/DrillbitContext.java | 6 + .../server/options/SystemOptionManager.java | 5 +- .../store/parquet/ParquetReaderStats.java | 42 +-- .../parquet/ParquetScanBatchCreator.java | 14 +- .../columnreaders/AsyncPageReader.java | 332 ++++++++++++++++++ .../parquet/columnreaders/ColumnReader.java | 36 +- .../parquet/columnreaders/PageReader.java | 160 ++++----- .../columnreaders/ParquetRecordReader.java | 102 +++++- .../columnreaders/VarLenBinaryReader.java | 6 +- .../BufferedDirectBufInputStream.java | 51 ++- .../src/main/resources/drill-module.conf | 4 + .../main/resources/rest/profile/profile.ftl | 2 +- 19 files changed, 684 insertions(+), 173 deletions(-) create mode 100644 exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java diff --git a/distribution/src/resources/drill-override-example.conf b/distribution/src/resources/drill-override-example.conf index 52949db371a..4be4aa2c487 100644 --- a/distribution/src/resources/drill-override-example.conf +++ b/distribution/src/resources/drill-override-example.conf @@ -166,6 +166,10 @@ drill.exec: { initial: 20000000 } }, + scan: { + threadpool_size: 8, + decode_threadpool_size: 1 + }, debug.error_on_leak: true } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index ba6b0846490..a13fd717254 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -96,6 +96,10 @@ public interface ExecConstants { /** Size of JDBC batch queue (in batches) above which throttling begins. */ String JDBC_BATCH_QUEUE_THROTTLING_THRESHOLD = "drill.jdbc.batch_queue_throttling_threshold"; + // Thread pool size for scan threads. Used by the Parquet scan. + String SCAN_THREADPOOL_SIZE = "drill.exec.scan.threadpool_size"; + // The size of the thread pool used by a scan to decode the data. Used by Parquet + String SCAN_DECODE_THREADPOOL_SIZE = "drill.exec.scan.decode_threadpool_size"; /** * Currently if a query is cancelled, but one of the fragments reports the status as FAILED instead of CANCELLED or @@ -147,10 +151,21 @@ public interface ExecConstants { String PARQUET_READER_INT96_AS_TIMESTAMP = "store.parquet.reader.int96_as_timestamp"; OptionValidator PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR = new BooleanValidator(PARQUET_READER_INT96_AS_TIMESTAMP, false); + String PARQUET_PAGEREADER_ASYNC = "store.parquet.reader.pagereader.async"; + OptionValidator PARQUET_PAGEREADER_ASYNC_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_ASYNC, true); + // Use a buffering reader for parquet page reader String PARQUET_PAGEREADER_USE_BUFFERED_READ = "store.parquet.reader.pagereader.bufferedread"; OptionValidator PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_BUFFERED_READ, true); + // Size in MiB of the buffer the Parquet page reader will use to read from disk. Default is 8 MiB + String PARQUET_PAGEREADER_BUFFER_SIZE = "store.parquet.reader.pagereader.buffersize"; + OptionValidator PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR = new LongValidator(PARQUET_PAGEREADER_BUFFER_SIZE, 4*1024*1024); + + // try to use fadvise if available + String PARQUET_PAGEREADER_USE_FADVISE = "store.parquet.reader.pagereader.usefadvise"; + OptionValidator PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR = new BooleanValidator(PARQUET_PAGEREADER_USE_FADVISE, false); + OptionValidator COMPILE_SCALAR_REPLACEMENT = new BooleanValidator("exec.compile.scalar_replacement", false); String JSON_ALL_TEXT_MODE = "store.json.all_text_mode"; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index 33fa288204f..92a7269b633 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -47,10 +47,16 @@ public abstract class OperatorContext { public abstract ExecutorService getExecutor(); + public abstract ExecutorService getScanExecutor(); + + public abstract ExecutorService getScanDecodeExecutor(); + public abstract ExecutionControls getExecutionControls(); public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException; + public abstract DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException; + /** * Run the callable as the given proxy user. * diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index 85f0ccb2b86..38ddd166812 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -50,6 +50,8 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable { private final BufferManager manager; private DrillFileSystem fs; private final ExecutorService executor; + private final ExecutorService scanExecutor; + private final ExecutorService scanDecodeExecutor; /** * This lazily initialized executor service is used to submit a {@link Callable task} that needs a proxy user. There @@ -70,6 +72,8 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context) stats = context.getStats().newOperatorStats(def, allocator); executionControls = context.getExecutionControls(); executor = context.getDrillbitContext().getExecutor(); + scanExecutor = context.getDrillbitContext().getScanExecutor(); + scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor(); } public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, OperatorStats stats) @@ -81,6 +85,8 @@ public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, this.stats = stats; executionControls = context.getExecutionControls(); executor = context.getDrillbitContext().getExecutor(); + scanExecutor = context.getDrillbitContext().getScanExecutor(); + scanDecodeExecutor = context.getDrillbitContext().getScanDecodeExecutor(); } public DrillBuf replace(DrillBuf old, int newSize) { @@ -95,10 +101,16 @@ public DrillBuf getManagedBuffer(int size) { return manager.getManagedBuffer(size); } - // Allow and operator to use the thread pool + // Allow an operator to use the thread pool public ExecutorService getExecutor() { return executor; } + public ExecutorService getScanExecutor() { + return scanExecutor; + } + public ExecutorService getScanDecodeExecutor() { + return scanDecodeExecutor; + } public ExecutionControls getExecutionControls() { return executionControls; @@ -179,4 +191,11 @@ public DrillFileSystem newFileSystem(Configuration conf) throws IOException { return fs; } + @Override + public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { + Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); + fs = new DrillFileSystem(conf, null); + return fs; + } + } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java index b704bb609b1..04243327127 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorMetricRegistry.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec; import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch; import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch; +import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType; /** @@ -47,6 +48,7 @@ public class OperatorMetricRegistry { register(CoreOperatorType.HASH_AGGREGATE_VALUE, HashAggTemplate.Metric.class); register(CoreOperatorType.HASH_JOIN_VALUE, HashJoinBatch.Metric.class); register(CoreOperatorType.EXTERNAL_SORT_VALUE, ExternalSortBatch.Metric.class); + register(CoreOperatorType.PARQUET_ROW_GROUP_SCAN_VALUE, ParquetRecordReader.Metric.class); } private static void register(final int operatorType, final Class metricDef) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java index 271f7342922..b565774a3ac 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java @@ -131,7 +131,7 @@ public OperatorStats mergeMetrics(OperatorStats from) { /** * Clear stats */ - public void clear() { + public synchronized void clear() { processingNanos = 0l; setupNanos = 0l; waitNanos = 0l; @@ -139,47 +139,47 @@ public void clear() { doubleMetrics.clear(); } - public void startSetup() { + public synchronized void startSetup() { assert !inSetup : assertionError("starting setup"); stopProcessing(); inSetup = true; setupMark = System.nanoTime(); } - public void stopSetup() { + public synchronized void stopSetup() { assert inSetup : assertionError("stopping setup"); startProcessing(); setupNanos += System.nanoTime() - setupMark; inSetup = false; } - public void startProcessing() { + public synchronized void startProcessing() { assert !inProcessing : assertionError("starting processing"); processingMark = System.nanoTime(); inProcessing = true; } - public void stopProcessing() { + public synchronized void stopProcessing() { assert inProcessing : assertionError("stopping processing"); processingNanos += System.nanoTime() - processingMark; inProcessing = false; } - public void startWait() { + public synchronized void startWait() { assert !inWait : assertionError("starting waiting"); stopProcessing(); inWait = true; waitMark = System.nanoTime(); } - public void stopWait() { + public synchronized void stopWait() { assert inWait : assertionError("stopping waiting"); startProcessing(); waitNanos += System.nanoTime() - waitMark; inWait = false; } - public void batchReceived(int inputIndex, long records, boolean newSchema) { + public synchronized void batchReceived(int inputIndex, long records, boolean newSchema) { recordsReceivedByInput[inputIndex] += records; batchesReceivedByInput[inputIndex]++; if(newSchema){ diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index 6554e3307f5..adb6323bb54 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -20,11 +20,11 @@ import com.codahale.metrics.MetricRegistry; import io.netty.channel.EventLoopGroup; +import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.SynchronousQueue; import org.apache.drill.common.DrillAutoCloseables; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.common.scanner.persistence.ScanResult; @@ -37,6 +37,7 @@ public class BootStrapContext implements AutoCloseable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BootStrapContext.class); + private static final int MIN_SCAN_THREADPOOL_SIZE = 8; // Magic num private final DrillConfig config; private final EventLoopGroup loop; @@ -45,12 +46,15 @@ public class BootStrapContext implements AutoCloseable { private final BufferAllocator allocator; private final ScanResult classpathScan; private final ExecutorService executor; + private final ExecutorService scanExecutor; + private final ExecutorService scanDecodeExecutor; public BootStrapContext(DrillConfig config, ScanResult classpathScan) { this.config = config; this.classpathScan = classpathScan; this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-"); - this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-"); + this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), + "BitClient-"); // Note that metrics are stored in a static instance this.metrics = DrillMetrics.getRegistry(); this.allocator = RootAllocatorFactory.newRoot(config); @@ -65,12 +69,35 @@ protected void afterExecute(final Runnable r, final Throwable t) { super.afterExecute(r, t); } }; + // Setup two threadpools one for reading raw data from disk and another for decoding the data + // A good guideline is to have the number threads in the scan pool to be a multiple (fractional + // numbers are ok) of the number of disks. + // A good guideline is to have the number threads in the decode pool to be a small multiple (fractional + // numbers are ok) of the number of cores. + final int numCores = Runtime.getRuntime().availableProcessors(); + final int numScanThreads = (int) (config.getDouble(ExecConstants.SCAN_THREADPOOL_SIZE)); + final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE); + final int scanThreadPoolSize = + MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads; + final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores : numScanDecodeThreads; + + this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-")); + this.scanDecodeExecutor = + Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-")); } public ExecutorService getExecutor() { return executor; } + public ExecutorService getScanExecutor() { + return scanExecutor; + } + + public ExecutorService getScanDecodeExecutor() { + return scanDecodeExecutor; + } + public DrillConfig getConfig() { return config; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index 3eb87eaad2b..ffe6c28d09c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -172,6 +172,12 @@ public CodeCompiler getCompiler() { public ExecutorService getExecutor() { return context.getExecutor(); } + public ExecutorService getScanExecutor() { + return context.getScanExecutor(); + } + public ExecutorService getScanDecodeExecutor() { + return context.getScanDecodeExecutor(); + } public LogicalPlanPersistence getLpPersistence() { return lpPersistence; diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index 8b67fdb3502..1981d24b027 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -99,8 +99,11 @@ public class SystemOptionManager extends BaseOptionManager implements AutoClosea ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING_VALIDATOR, ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR, ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR, - ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, + ExecConstants.PARQUET_PAGEREADER_ASYNC_VALIDATOR, + ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ_VALIDATOR, + ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE_VALIDATOR, + ExecConstants.PARQUET_PAGEREADER_USE_FADVISE_VALIDATOR, ExecConstants.PARQUET_READER_INT96_AS_TIMESTAMP_VALIDATOR, ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR, ExecConstants.ENABLE_UNION_TYPE, diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java index e95b0c88c52..c2711cca175 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetReaderStats.java @@ -17,28 +17,30 @@ */ package org.apache.drill.exec.store.parquet; +import java.util.concurrent.atomic.AtomicLong; + public class ParquetReaderStats { - public long numDictPageHeaders; - public long numPageHeaders; - public long numDictPageLoads; - public long numPageLoads; - public long numDictPagesDecompressed; - public long numPagesDecompressed; - - public long totalDictPageHeaderBytes; - public long totalPageHeaderBytes; - public long totalDictPageReadBytes; - public long totalPageReadBytes; - public long totalDictDecompressedBytes; - public long totalDecompressedBytes; - - public long timeDictPageHeaders; - public long timePageHeaders; - public long timeDictPageLoads; - public long timePageLoads; - public long timeDictPagesDecompressed; - public long timePagesDecompressed; + public AtomicLong numDictPageLoads = new AtomicLong(); + public AtomicLong numDataPageLoads = new AtomicLong(); + public AtomicLong numDataPagesDecoded = new AtomicLong(); + public AtomicLong numDictPagesDecompressed = new AtomicLong(); + public AtomicLong numDataPagesDecompressed = new AtomicLong(); + + public AtomicLong totalDictPageReadBytes = new AtomicLong(); + public AtomicLong totalDataPageReadBytes = new AtomicLong(); + public AtomicLong totalDictDecompressedBytes = new AtomicLong(); + public AtomicLong totalDataDecompressedBytes = new AtomicLong(); + + public AtomicLong timeDictPageLoads = new AtomicLong(); + public AtomicLong timeDataPageLoads = new AtomicLong(); + public AtomicLong timeDataPageDecode = new AtomicLong(); + public AtomicLong timeDictPageDecode = new AtomicLong(); + public AtomicLong timeDictPagesDecompressed = new AtomicLong(); + public AtomicLong timeDataPagesDecompressed = new AtomicLong(); + + public AtomicLong timeDiskScanWait = new AtomicLong(); + public AtomicLong timeDiskScan = new AtomicLong(); public ParquetReaderStats() { } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index a98c66083c2..a14bab52dfe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -73,10 +73,18 @@ public ScanBatch getBatch(FragmentContext context, ParquetRowGroupScan rowGroupS DrillFileSystem fs; try { - fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf()); - } catch(IOException e) { - throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e); + boolean useAsyncPageReader = + context.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val; + if (useAsyncPageReader) { + fs = oContext.newNonTrackingFileSystem(rowGroupScan.getStorageEngine().getFsConf()); + } else { + fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf()); + } + } catch (IOException e) { + throw new ExecutionSetupException( + String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e); } + Configuration conf = new Configuration(fs.getConf()); conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false); conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java new file mode 100644 index 00000000000..3f47f04b6c5 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.store.parquet.columnreaders; + +import com.google.common.base.Stopwatch; +import io.netty.buffer.DrillBuf; +import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.drill.exec.util.filereader.DirectBufInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.page.DictionaryPage; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.PageType; +import org.apache.parquet.format.Util; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; + +import java.io.IOException; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.apache.parquet.column.Encoding.valueOf; + +class AsyncPageReader extends PageReader { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AsyncPageReader.class); + + + private ExecutorService threadPool; + private Future asyncPageRead; + + AsyncPageReader(ColumnReader parentStatus, FileSystem fs, Path path, + ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { + super(parentStatus, fs, path, columnChunkMetaData); + if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { + if (columnChunkMetaData.getDictionaryPageOffset() > 0) { + try { + dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); + } catch (IOException e) { + handleAndThrowException(e, "Error Reading dictionary page."); + } + // parent constructor may call this method before the thread pool is set. + if (threadPool == null) { + threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor(); + } + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + readDictionaryPage(asyncPageRead, parentStatus); + asyncPageRead = null; // reset after consuming + } + } + + private DrillBuf getDecompressedPageData(ReadStatus readStatus) { + DrillBuf data; + boolean isDictionary = false; + synchronized (this) { + data = readStatus.getPageData(); + readStatus.setPageData(null); + isDictionary = readStatus.isDictionaryPage; + } + if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { + DrillBuf uncompressedData = data; + data = decompress(readStatus.getPageHeader(), uncompressedData); + synchronized (this) { + readStatus.setPageData(null); + } + uncompressedData.release(); + } else { + if (isDictionary) { + stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead); + } else { + stats.totalDataPageReadBytes.addAndGet(readStatus.bytesRead); + } + } + return data; + } + + // Read and decode the dictionary and the header + private void readDictionaryPage(final Future asyncPageRead, + final ColumnReader parentStatus) throws UserException { + try { + Stopwatch timer = Stopwatch.createStarted(); + ReadStatus readStatus = asyncPageRead.get(); + long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS); + stats.timeDiskScanWait.addAndGet(timeBlocked); + stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime()); + stats.numDictPageLoads.incrementAndGet(); + stats.timeDictPageLoads.addAndGet(timeBlocked+readStatus.getDiskScanTime()); + readDictionaryPageData(readStatus, parentStatus); + } catch (Exception e) { + handleAndThrowException(e, "Error reading dictionary page."); + } + } + + // Read and decode the dictionary data + private void readDictionaryPageData(final ReadStatus readStatus, final ColumnReader parentStatus) + throws UserException { + try { + pageHeader = readStatus.getPageHeader(); + int uncompressedSize = pageHeader.getUncompressed_page_size(); + final DrillBuf dictionaryData = getDecompressedPageData(readStatus); + Stopwatch timer = Stopwatch.createStarted(); + allocatedDictionaryBuffers.add(dictionaryData); + DictionaryPage page = new DictionaryPage(asBytesInput(dictionaryData, 0, uncompressedSize), + pageHeader.uncompressed_page_size, pageHeader.dictionary_page_header.num_values, + valueOf(pageHeader.dictionary_page_header.encoding.name())); + this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); + long timeToDecode = timer.elapsed(TimeUnit.NANOSECONDS); + stats.timeDictPageDecode.addAndGet(timeToDecode); + } catch (Exception e) { + handleAndThrowException(e, "Error decoding dictionary page."); + } + } + + private void handleAndThrowException(Exception e, String msg) throws UserException { + UserException ex = UserException.dataReadError(e).message(msg) + .pushContext("Row Group Start: ", this.parentColumnReader.columnChunkMetaData.getStartingPos()) + .pushContext("Column: ", this.parentColumnReader.schemaElement.getName()) + .pushContext("File: ", this.fileName).build(logger); + throw ex; + } + + private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) { + DrillBuf pageDataBuf = null; + Stopwatch timer = Stopwatch.createUnstarted(); + long timeToRead; + int compressedSize = pageHeader.getCompressed_page_size(); + int uncompressedSize = pageHeader.getUncompressed_page_size(); + pageDataBuf = allocateTemporaryBuffer(uncompressedSize); + try { + timer.start(); + codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData.getCodec()) + .decompress(compressedData.nioBuffer(0, compressedSize), compressedSize, + pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize); + timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize); + } catch (IOException e) { + handleAndThrowException(e, "Error decompressing data."); + } + return pageDataBuf; + } + + @Override protected void nextInternal() throws IOException { + ReadStatus readStatus = null; + try { + Stopwatch timer = Stopwatch.createStarted(); + readStatus = asyncPageRead.get(); + long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS); + stats.timeDiskScanWait.addAndGet(timeBlocked); + stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime()); + if (readStatus.isDictionaryPage) { + stats.numDictPageLoads.incrementAndGet(); + stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime()); + } else { + stats.numDataPageLoads.incrementAndGet(); + stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime()); + } + pageHeader = readStatus.getPageHeader(); + // reset this. At the time of calling close, if this is not null then a pending asyncPageRead needs to be consumed + asyncPageRead = null; + } catch (Exception e) { + handleAndThrowException(e, "Error reading page data."); + } + + // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one + // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary + + do { + if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { + readDictionaryPageData(readStatus, parentColumnReader); + // Ugly. Use the Async task to make a synchronous read call. + readStatus = new AsyncPageReaderTask().call(); + pageHeader = readStatus.getPageHeader(); + } + } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); + + if (dataReader.hasRemainder() && parentColumnReader.totalValuesRead + readStatus.getValuesRead() + < parentColumnReader.columnChunkMetaData.getValueCount()) { + asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); + } + + pageHeader = readStatus.getPageHeader(); + pageData = getDecompressedPageData(readStatus); + + } + + + @Override public void clear() { + if (asyncPageRead != null) { + asyncPageRead.cancel(true); + try { + ReadStatus r = asyncPageRead.get(); + r.getPageData().release(); + } catch (Exception e) { + // Do nothing. + } + } + super.clear(); + } + + public static class ReadStatus { + private PageHeader pageHeader; + private DrillBuf pageData; + private boolean isDictionaryPage = false; + private long bytesRead = 0; + private long valuesRead = 0; + private long diskScanTime = 0; + + public synchronized PageHeader getPageHeader() { + return pageHeader; + } + + public synchronized void setPageHeader(PageHeader pageHeader) { + this.pageHeader = pageHeader; + } + + public synchronized DrillBuf getPageData() { + return pageData; + } + + public synchronized void setPageData(DrillBuf pageData) { + this.pageData = pageData; + } + + public synchronized boolean isDictionaryPage() { + return isDictionaryPage; + } + + public synchronized void setIsDictionaryPage(boolean isDictionaryPage) { + this.isDictionaryPage = isDictionaryPage; + } + + public synchronized long getBytesRead() { + return bytesRead; + } + + public synchronized void setBytesRead(long bytesRead) { + this.bytesRead = bytesRead; + } + + public synchronized long getValuesRead() { + return valuesRead; + } + + public synchronized void setValuesRead(long valuesRead) { + this.valuesRead = valuesRead; + } + + public long getDiskScanTime() { + return diskScanTime; + } + + public void setDiskScanTime(long diskScanTime) { + this.diskScanTime = diskScanTime; + } + } + + + private class AsyncPageReaderTask implements Callable { + + private final AsyncPageReader parent = AsyncPageReader.this; + + public AsyncPageReaderTask() { + } + + @Override public ReadStatus call() throws IOException { + ReadStatus readStatus = new ReadStatus(); + + String oldname = Thread.currentThread().getName(); + Thread.currentThread().setName(parent.parentColumnReader.columnChunkMetaData.toString()); + + long bytesRead = 0; + long valuesRead = 0; + Stopwatch timer = Stopwatch.createStarted(); + + DrillBuf pageData = null; + try { + PageHeader pageHeader = Util.readPageHeader(parent.dataReader); + int compressedSize = pageHeader.getCompressed_page_size(); + pageData = parent.dataReader.getNext(compressedSize); + bytesRead = compressedSize; + synchronized (parent) { + if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { + readStatus.setIsDictionaryPage(true); + valuesRead += pageHeader.getDictionary_page_header().getNum_values(); + } else { + valuesRead += pageHeader.getData_page_header().getNum_values(); + } + long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); + readStatus.setPageHeader(pageHeader); + readStatus.setPageData(pageData); + readStatus.setBytesRead(bytesRead); + readStatus.setValuesRead(valuesRead); + readStatus.setDiskScanTime(timeToRead); + } + + } catch (Exception e) { + if (pageData != null) { + pageData.release(); + } + throw e; + } + Thread.currentThread().setName(oldname); + return readStatus; + } + + } + +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index f62f42424ea..6572c78cc1f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.common.exceptions.UserException; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; @@ -70,7 +72,7 @@ ColumnDescriptor getColumnDescriptor() { protected DrillBuf vectorData; // when reading definition levels for nullable columns, it is a one-way stream of integers // when reading var length data, where we don't know if all of the records will fit until we've read all of them - // we must store the last definition level an use it in at the start of the next batch + // we must store the last definition level and use it at the start of the next batch int currDefLevel; // variables for a single read pass @@ -84,7 +86,17 @@ protected ColumnReader(ParquetRecordReader parentReader, int allocateSize, Colum this.isFixedLength = fixedLength; this.schemaElement = schemaElement; this.valueVec = v; - this.pageReader = new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), columnChunkMetaData); + boolean useAsyncPageReader = parentReader.getFragmentContext().getOptions() + .getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val; + if (useAsyncPageReader) { + this.pageReader = + new AsyncPageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), + columnChunkMetaData); + } else { + this.pageReader = + new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath(), + columnChunkMetaData); + } if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) { if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) { @@ -117,11 +129,23 @@ public void clear() { } public void readValues(long recordsToRead) { - readField(recordsToRead); + try { + readField(recordsToRead); + + valuesReadInCurrentPass += recordsReadInThisIteration; + pageReader.valuesRead += recordsReadInThisIteration; + pageReader.readPosInBytes = readStartInBytes + readLength; + } catch (Exception e) { + UserException ex = UserException.dataReadError(e) + .message("Error reading from Parquet file") + .pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos()) + .pushContext("Column: ", this.schemaElement.getName()) + .pushContext("File: ", this.parentReader.getHadoopPath().toString() ) + .build(logger); + throw ex; + + } - valuesReadInCurrentPass += recordsReadInThisIteration; - pageReader.valuesRead += recordsReadInThisIteration; - pageReader.readPosInBytes = readStartInBytes + readLength; } protected abstract void readField(long recordsToRead); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 078e4ceb934..c34ebd1434a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -17,7 +17,6 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; -import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream; @@ -36,7 +35,6 @@ import org.apache.parquet.column.Encoding; import org.apache.parquet.column.ValuesType; import org.apache.parquet.column.page.DictionaryPage; -import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.dictionary.DictionaryValuesReader; import org.apache.parquet.format.PageHeader; @@ -58,25 +56,23 @@ import static org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics; // class to keep track of the read position of variable length columns -final class PageReader { +class PageReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger( org.apache.drill.exec.store.parquet.columnreaders.PageReader.class); public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter; - private final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentColumnReader; + protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentColumnReader; //private final ColumnDataReader dataReader; - private final DirectBufInputStream dataReader; + protected final DirectBufInputStream dataReader; //der; buffer to store bytes of current page - DrillBuf pageData; + protected DrillBuf pageData; // for variable length data we need to keep track of our current position in the page data // as the values and lengths are intermixed, making random access to the length data impossible long readyToReadPosInBytes; // read position in the current page, stored in the ByteBuf in ParquetRecordReader called bufferWithAllData long readPosInBytes; - // bit shift needed for the next page if the last one did not line up with a byte boundary - int bitShift; // storage space for extra bits at the end of a page if they did not line up with a byte boundary // prevents the need to keep the entire last page, as these pageDataByteArray need to be added to the next batch //byte extraBits; @@ -103,14 +99,18 @@ final class PageReader { int currentPageCount = -1; - private FSDataInputStream inputStream; + protected FSDataInputStream inputStream; // These need to be held throughout reading of the entire column chunk List allocatedDictionaryBuffers; - private final CodecFactory codecFactory; + protected final CodecFactory codecFactory; + protected final String fileName; - private final ParquetReaderStats stats; + protected final ParquetReaderStats stats; + private final boolean useBufferedReader; + private final int scanBufferSize; + private final boolean useFadvise; PageReader(org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException { @@ -118,21 +118,24 @@ final class PageReader { allocatedDictionaryBuffers = new ArrayList(); codecFactory = parentColumnReader.parentReader.getCodecFactory(); this.stats = parentColumnReader.parentReader.parquetReaderStats; - long start = columnChunkMetaData.getFirstDataPageOffset(); + this.fileName = path.toString(); try { inputStream = fs.open(path); BufferAllocator allocator = parentColumnReader.parentReader.getOperatorContext().getAllocator(); - //TODO: make read batch size configurable columnChunkMetaData.getTotalUncompressedSize(); - boolean useBufferedReader = parentColumnReader.parentReader.getFragmentContext().getOptions() + useBufferedReader = parentColumnReader.parentReader.getFragmentContext().getOptions() .getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val; + scanBufferSize = parentColumnReader.parentReader.getFragmentContext().getOptions() + .getOption(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE).num_val.intValue(); + useFadvise = parentColumnReader.parentReader.getFragmentContext().getOptions() + .getOption(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE).bool_val; if (useBufferedReader) { this.dataReader = new BufferedDirectBufInputStream(inputStream, allocator, path.getName(), - columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), 8 * 1024 * 1024, - true); + columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), scanBufferSize, + useFadvise); } else { this.dataReader = new DirectBufInputStream(inputStream, allocator, path.getName(), - columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), true); + columnChunkMetaData.getStartingPos(), columnChunkMetaData.getTotalSize(), useFadvise); } dataReader.init(); @@ -145,7 +148,7 @@ final class PageReader { } - private void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentStatus, + protected void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentStatus, final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws IOException { Stopwatch timer = Stopwatch.createUnstarted(); if (columnChunkMetaData.getDictionaryPageOffset() > 0) { @@ -153,7 +156,7 @@ private void loadDictionaryIfExists(final org.apache.drill.exec.store.parquet.co long start=dataReader.getPos(); timer.start(); final PageHeader pageHeader = Util.readPageHeader(f); - long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); long pageHeaderBytes=dataReader.getPos()-start; this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes); assert pageHeader.type == PageType.DICTIONARY_PAGE; @@ -178,7 +181,7 @@ private void readDictionaryPage(final PageHeader pageHeader, this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); } - public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize) throws IOException { + private DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompressedSize) throws IOException { DrillBuf pageDataBuf = null; Stopwatch timer = Stopwatch.createUnstarted(); long timeToRead; @@ -186,7 +189,7 @@ public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompre if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) { timer.start(); pageDataBuf = dataReader.getNext(compressedSize); - timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize); } else { DrillBuf compressedData = null; @@ -195,8 +198,7 @@ public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompre try { timer.start(); compressedData = dataReader.getNext(compressedSize); - // dataReader.loadPage(compressedData, compressedSize); - timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); timer.reset(); this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, compressedSize); start=dataReader.getPos(); @@ -204,7 +206,7 @@ public DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompre codecFactory.getDecompressor(parentColumnReader.columnChunkMetaData .getCodec()).decompress(compressedData.nioBuffer(0, compressedSize), compressedSize, pageDataBuf.nioBuffer(0, uncompressedSize), uncompressedSize); - timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); this.updateStats(pageHeader, "Decompress", start, timeToRead, compressedSize, uncompressedSize); } finally { if(compressedData != null) { @@ -219,25 +221,12 @@ public static BytesInput asBytesInput(DrillBuf buf, int offset, int length) thro return BytesInput.from(buf.nioBuffer(offset, length), 0, length); } + /** - * Grab the next page. - * - * @return - if another page was present - * @throws IOException + * Get the page header and the pageData (uncompressed) for the next page */ - public boolean next() throws IOException { + protected void nextInternal() throws IOException{ Stopwatch timer = Stopwatch.createUnstarted(); - currentPageCount = -1; - valuesRead = 0; - valuesReadyToRead = 0; - - // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause - // and submit a bug report - if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { - return false; - } - clearBuffers(); - // next, we need to decompress the bytes // TODO - figure out if we need multiple dictionary pages, I believe it may be limited to one // I think we are clobbering parts of the dictionary if there can be multiple pages of dictionary @@ -245,7 +234,7 @@ public boolean next() throws IOException { long start=dataReader.getPos(); timer.start(); pageHeader = Util.readPageHeader(dataReader); - long timeToRead = timer.elapsed(TimeUnit.MICROSECONDS); + long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); long pageHeaderBytes=dataReader.getPos()-start; this.updateStats(pageHeader, "Page Header", start, timeToRead, pageHeaderBytes, pageHeaderBytes); logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}","Page Header Read","", @@ -264,14 +253,33 @@ public boolean next() throws IOException { int uncompressedSize = pageHeader.getUncompressed_page_size(); pageData = readPage(pageHeader, compressedSize, uncompressedSize); - currentPageCount = pageHeader.data_page_header.num_values; - final int uncompressedPageSize = pageHeader.uncompressed_page_size; - final Statistics stats = fromParquetStatistics(pageHeader.data_page_header.getStatistics(), parentColumnReader - .getColumnDescriptor().getType()); + } + + /** + * Grab the next page. + * + * @return - if another page was present + * @throws IOException + */ + public boolean next() throws IOException { + Stopwatch timer = Stopwatch.createUnstarted(); + currentPageCount = -1; + valuesRead = 0; + valuesReadyToRead = 0; + // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause + // and submit a bug report + if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { + return false; + } + clearBuffers(); - final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding); + nextInternal(); + timer.start(); + currentPageCount = pageHeader.data_page_header.num_values; + + final Encoding rlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.repetition_level_encoding); final Encoding dlEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.definition_level_encoding); final Encoding valueEncoding = METADATA_CONVERTER.getEncoding(pageHeader.data_page_header.encoding); @@ -321,40 +329,24 @@ public boolean next() throws IOException { // fit one record at a time, such as for variable length data. Both operations must start in the same location after the // definition and repetition level data which is stored alongside the page data itself readyToReadPosInBytes = readPosInBytes; + long timeDecode = timer.elapsed(TimeUnit.NANOSECONDS); + stats.numDataPagesDecoded.incrementAndGet(); + stats.timeDataPageDecode.addAndGet(timeDecode); return true; } - /** - * Allocate a page data buffer. Note that only one page data buffer should be active at a time. The reader will ensure - * that the page data is released after the reader is completed. - */ - private void allocatePageData(int size) { - Preconditions.checkArgument(pageData == null); - pageData = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size); - } - /** * Allocate a buffer which the user should release immediately. The reader does not manage release of these buffers. */ - private DrillBuf allocateTemporaryBuffer(int size) { + protected DrillBuf allocateTemporaryBuffer(int size) { return parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size); } - /** - * Allocate and return a dictionary buffer. These are maintained for the life of the reader and then released when the - * reader is cleared. - */ - private DrillBuf allocateDictionaryBuffer(int size) { - DrillBuf buf = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size); - allocatedDictionaryBuffers.add(buf); - return buf; - } - protected boolean hasPage() { return currentPageCount != -1; } - private void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) { + protected void updateStats(PageHeader pageHeader, String op, long start, long time, long bytesin, long bytesout) { String pageType = "Data Page"; if (pageHeader.type == PageType.DICTIONARY_PAGE) { pageType = "Dictionary Page"; @@ -362,37 +354,38 @@ private void updateStats(PageHeader pageHeader, String op, long start, long time logger.trace("ParquetTrace,{},{},{},{},{},{},{},{}", op, pageType.toString(), this.parentColumnReader.parentReader.hadoopPath, this.parentColumnReader.columnDescriptor.toString(), start, bytesin, bytesout, time); + if (pageHeader.type != PageType.DICTIONARY_PAGE) { if (bytesin == bytesout) { - this.stats.timePageLoads += time; - this.stats.numPageLoads++; - this.stats.totalPageReadBytes += bytesin; + this.stats.timeDataPageLoads.addAndGet(time); + this.stats.numDataPageLoads.incrementAndGet(); + this.stats.totalDataPageReadBytes.addAndGet(bytesin); } else { - this.stats.timePagesDecompressed += time; - this.stats.numPagesDecompressed++; - this.stats.totalDecompressedBytes += bytesin; + this.stats.timeDataPagesDecompressed.addAndGet(time); + this.stats.numDataPagesDecompressed.incrementAndGet(); + this.stats.totalDataDecompressedBytes.addAndGet(bytesin); } } else { if (bytesin == bytesout) { - this.stats.timeDictPageLoads += time; - this.stats.numDictPageLoads++; - this.stats.totalDictPageReadBytes += bytesin; + this.stats.timeDictPageLoads.addAndGet(time); + this.stats.numDictPageLoads.incrementAndGet(); + this.stats.totalDictPageReadBytes.addAndGet(bytesin); } else { - this.stats.timeDictPagesDecompressed += time; - this.stats.numDictPagesDecompressed++; - this.stats.totalDictDecompressedBytes += bytesin; + this.stats.timeDictPagesDecompressed.addAndGet(time); + this.stats.numDictPagesDecompressed.incrementAndGet(); + this.stats.totalDictDecompressedBytes.addAndGet(bytesin); } } } - public void clearBuffers() { + protected void clearBuffers() { if (pageData != null) { pageData.release(); pageData = null; } } - public void clearDictionaryBuffers() { + protected void clearDictionaryBuffers() { for (ByteBuf b : allocatedDictionaryBuffers) { b.release(); } @@ -401,15 +394,14 @@ public void clearDictionaryBuffers() { public void clear(){ try { + this.inputStream.close(); this.dataReader.close(); } catch (IOException e) { - //TODO: Throw UserException + //Swallow the exception which is OK for input streams } // Free all memory, including fixed length types. (Data is being copied for all types not just var length types) - //if(!this.parentColumnReader.isFixedLength) { clearBuffers(); clearDictionaryBuffers(); - //} } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 924887edab7..1eca00fa264 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -35,6 +35,7 @@ import org.apache.drill.exec.exception.OutOfMemoryException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.MetricDef; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; @@ -117,16 +118,39 @@ public class ParquetRecordReader extends AbstractRecordReader { public ParquetReaderStats parquetReaderStats = new ParquetReaderStats(); + public enum Metric implements MetricDef { + NUM_DICT_PAGE_LOADS, // Number of dictionary pages read + NUM_DATA_PAGE_lOADS, // Number of data pages read + NUM_DATA_PAGES_DECODED, // Number of data pages decoded + NUM_DICT_PAGES_DECOMPRESSED, // Number of dictionary pages decompressed + NUM_DATA_PAGES_DECOMPRESSED, // Number of data pages decompressed + TOTAL_DICT_PAGE_READ_BYTES, // Total bytes read from disk for dictionary pages + TOTAL_DATA_PAGE_READ_BYTES, // Total bytes read from disk for data pages + TOTAL_DICT_DECOMPRESSED_BYTES, // Total bytes decompressed for dictionary pages (same as compressed bytes on disk) + TOTAL_DATA_DECOMPRESSED_BYTES, // Total bytes decompressed for data pages (same as compressed bytes on disk) + TIME_DICT_PAGE_LOADS, // Time in nanos in reading dictionary pages from disk + TIME_DATA_PAGE_LOADS, // Time in nanos in reading data pages from disk + TIME_DATA_PAGE_DECODE, // Time in nanos in decoding data pages + TIME_DICT_PAGE_DECODE, // Time in nanos in decoding dictionary pages + TIME_DICT_PAGES_DECOMPRESSED, // Time in nanos in decompressing dictionary pages + TIME_DATA_PAGES_DECOMPRESSED, // Time in nanos in decompressing data pages + TIME_DISK_SCAN_WAIT, // Time in nanos spent in waiting for an async disk read to complete + TIME_DISK_SCAN; // Time in nanos spent in reading data from disk. + + @Override public int metricId() { + return ordinal(); + } + } + public ParquetRecordReader(FragmentContext fragmentContext, String path, int rowGroupIndex, - long numRecordsToRead, + long numRecordsToRead, FileSystem fs, CodecFactory codecFactory, ParquetMetadata footer, - List columns, - ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) - throws ExecutionSetupException { + List columns, + ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) throws ExecutionSetupException { this(fragmentContext, DEFAULT_BATCH_LENGTH_IN_BITS, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus); } @@ -470,6 +494,7 @@ public int next() { // No columns found in the file were selected, simply return a full batch of null records for each column requested if (firstColumnStatus == null) { if (mockRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount()) { + updateStats(); return 0; } recordsToRead = Math.min(DEFAULT_RECORDS_TO_READ_IF_NOT_FIXED_WIDTH, footer.getBlocks().get(rowGroupIndex).getRowCount() - mockRecordsRead); @@ -483,6 +508,7 @@ public int next() { mockRecordsRead += recordsToRead; totalRecordsRead += recordsToRead; numRecordsToRead -= recordsToRead; + updateStats(); return (int) recordsToRead; } @@ -514,6 +540,7 @@ public int next() { // logger.debug("So far read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); totalRecordsRead += firstColumnStatus.getRecordsReadInCurrentPass(); numRecordsToRead -= firstColumnStatus.getRecordsReadInCurrentPass(); + updateStats(); return firstColumnStatus.getRecordsReadInCurrentPass(); } catch (Exception e) { handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() + @@ -530,7 +557,8 @@ public int next() { @Override public void close() { - logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, hadoopPath.toUri().getPath()); + logger.debug("Read {} records out of row group({}) in file '{}'", totalRecordsRead, rowGroupIndex, + hadoopPath.toUri().getPath()); // enable this for debugging when it is know that a whole file will be read // limit kills upstream operators once it has enough records, so this assert will fail // assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount(); @@ -552,29 +580,67 @@ public void close() { varLengthReader = null; } + if(parquetReaderStats != null) { - logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", + logger.trace("ParquetTrace,Summary,{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}", hadoopPath, - parquetReaderStats.numDictPageHeaders, - parquetReaderStats.numPageHeaders, parquetReaderStats.numDictPageLoads, - parquetReaderStats.numPageLoads, + parquetReaderStats.numDataPageLoads, + parquetReaderStats.numDataPagesDecoded, parquetReaderStats.numDictPagesDecompressed, - parquetReaderStats.numPagesDecompressed, - parquetReaderStats.totalDictPageHeaderBytes, - parquetReaderStats.totalPageHeaderBytes, + parquetReaderStats.numDataPagesDecompressed, parquetReaderStats.totalDictPageReadBytes, - parquetReaderStats.totalPageReadBytes, + parquetReaderStats.totalDataPageReadBytes, parquetReaderStats.totalDictDecompressedBytes, - parquetReaderStats.totalDecompressedBytes, - parquetReaderStats.timeDictPageHeaders, - parquetReaderStats.timePageHeaders, + parquetReaderStats.totalDataDecompressedBytes, parquetReaderStats.timeDictPageLoads, - parquetReaderStats.timePageLoads, + parquetReaderStats.timeDataPageLoads, + parquetReaderStats.timeDataPageDecode, + parquetReaderStats.timeDictPageDecode, parquetReaderStats.timeDictPagesDecompressed, - parquetReaderStats.timePagesDecompressed); + parquetReaderStats.timeDataPagesDecompressed, + parquetReaderStats.timeDiskScanWait, + parquetReaderStats.timeDiskScan + ); parquetReaderStats=null; } + + } + + private void updateStats(){ + + operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGE_LOADS, + parquetReaderStats.numDictPageLoads.longValue()); + operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGE_lOADS, parquetReaderStats.numDataPageLoads.longValue()); + operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECODED, parquetReaderStats.numDataPagesDecoded.longValue()); + operatorContext.getStats().setLongStat(Metric.NUM_DICT_PAGES_DECOMPRESSED, + parquetReaderStats.numDictPagesDecompressed.longValue()); + operatorContext.getStats().setLongStat(Metric.NUM_DATA_PAGES_DECOMPRESSED, + parquetReaderStats.numDataPagesDecompressed.longValue()); + operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_PAGE_READ_BYTES, + parquetReaderStats.totalDictPageReadBytes.longValue()); + operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_PAGE_READ_BYTES, + parquetReaderStats.totalDataPageReadBytes.longValue()); + operatorContext.getStats().setLongStat(Metric.TOTAL_DICT_DECOMPRESSED_BYTES, + parquetReaderStats.totalDictDecompressedBytes.longValue()); + operatorContext.getStats().setLongStat(Metric.TOTAL_DATA_DECOMPRESSED_BYTES, + parquetReaderStats.totalDataDecompressedBytes.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_LOADS, + parquetReaderStats.timeDictPageLoads.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_LOADS, + parquetReaderStats.timeDataPageLoads.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGE_DECODE, + parquetReaderStats.timeDataPageDecode.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGE_DECODE, + parquetReaderStats.timeDictPageDecode.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DICT_PAGES_DECOMPRESSED, + parquetReaderStats.timeDictPagesDecompressed.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DATA_PAGES_DECOMPRESSED, + parquetReaderStats.timeDataPagesDecompressed.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN_WAIT, + parquetReaderStats.timeDiskScanWait.longValue()); + operatorContext.getStats().setLongStat(Metric.TIME_DISK_SCAN, parquetReaderStats.timeDiskScan.longValue()); + } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java index 6ca0205174e..e03d930f53e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java @@ -17,15 +17,17 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import org.apache.drill.exec.vector.ValueVector; + import java.io.IOException; import java.util.List; public class VarLenBinaryReader { ParquetRecordReader parentReader; - final List> columns; + final List> columns; - public VarLenBinaryReader(ParquetRecordReader parentReader, List> columns) { + public VarLenBinaryReader(ParquetRecordReader parentReader, List> columns) { this.parentReader = parentReader; this.columns = columns; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java index 6aa968aa416..a5a6b8179d5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java @@ -23,7 +23,6 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.RootAllocatorFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -73,7 +72,7 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement /** * The current read position in the buffer; the index of the next * character to be read from the internalBuffer array. - *

+ *

* This value is always in the range [0,count]. * If curPosInBuffer is equal to count> then we have read * all the buffered data and the next read (or skip) will require more data to be read @@ -128,8 +127,7 @@ public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, S } - @Override - public void init() throws UnsupportedOperationException, IOException { + @Override public void init() throws UnsupportedOperationException, IOException { super.init(); this.internalBuffer = this.allocator.buffer(this.bufSize); this.tempBuffer = this.allocator.buffer(defaultTempBufferSize); @@ -180,10 +178,10 @@ private int getNextBlock() throws IOException { this.curPosInStream = getInputStream().getPos(); bytesRead = nBytes; logger.trace( - "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, " + - "CurPosInStream: {}, CurPosInBuffer: {}", - this.streamId, this.startOffset, this.totalByteSize, this.bufSize, bytesRead, this.count, - this.curPosInStream, this.curPosInBuffer); + "Stream: {}, StartOffset: {}, TotalByteSize: {}, BufferSize: {}, BytesRead: {}, Count: {}, " + + "CurPosInStream: {}, CurPosInBuffer: {}", this.streamId, this.startOffset, + this.totalByteSize, this.bufSize, bytesRead, this.count, this.curPosInStream, + this.curPosInBuffer); } } return this.count - this.curPosInBuffer; @@ -252,8 +250,8 @@ public synchronized int read() throws IOException { } /** - Has the same contract as {@link java.io.InputStream#read(byte[], int, int)} - Except with DrillBuf + * Has the same contract as {@link java.io.InputStream#read(byte[], int, int)} + * Except with DrillBuf */ public synchronized int read(DrillBuf buf, int off, int len) throws IOException { checkInputStreamState(); @@ -296,7 +294,7 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException return 0; } DrillBuf byteBuf; - if(len <= defaultTempBufferSize){ + if (len <= defaultTempBufferSize) { byteBuf = tempBuffer; } else { byteBuf = this.allocator.buffer(len); @@ -318,7 +316,7 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException } } while (bytesRead < len); - if(len > defaultTempBufferSize){ + if (len > defaultTempBufferSize) { byteBuf.release(); } @@ -327,12 +325,11 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException /** - Has the same contract as {@link java.io.InputStream#skip(long)} + * Has the same contract as {@link java.io.InputStream#skip(long)} * Skips upto the next n bytes. * Skip may return with less than n bytes skipped */ - @Override - public synchronized long skip(long n) throws IOException { + @Override public synchronized long skip(long n) throws IOException { checkInputStreamState(); long bytesAvailable = this.count - this.curPosInBuffer; long bytesSkipped = 0; @@ -353,8 +350,7 @@ public synchronized long skip(long n) throws IOException { } - @Override - public synchronized int available() throws IOException { + @Override public synchronized int available() throws IOException { checkInputStreamState(); int bytesAvailable = this.count - this.curPosInBuffer; int underlyingAvailable = getInputStream().available(); @@ -365,18 +361,15 @@ public synchronized int available() throws IOException { return available; } - @Override - public synchronized void mark(int readlimit) { + @Override public synchronized void mark(int readlimit) { throw new UnsupportedOperationException("Mark/reset is not supported."); } - @Override - public synchronized void reset() throws IOException { + @Override public synchronized void reset() throws IOException { throw new UnsupportedOperationException("Mark/reset is not supported."); } - @Override - public boolean markSupported() { + @Override public boolean markSupported() { return false; } @@ -384,7 +377,7 @@ public boolean markSupported() { Returns the current position from the beginning of the underlying input stream */ public long getPos() throws IOException { - return curPosInBuffer+startOffset; + return curPosInBuffer + startOffset; } public boolean hasRemainder() throws IOException { @@ -412,6 +405,11 @@ public void close() throws IOException { } } + /** + * Uncomment For testing Parquet files that are too big to use in unit tests + * @param args + */ + /* public static void main(String[] args) { final DrillConfig config = DrillConfig.create(); final BufferAllocator allocator = RootAllocatorFactory.newRoot(config); @@ -433,8 +431,8 @@ public static void main(String[] args) { long totalByteSize = columnMetadata.getTotalSize(); String streamId = fileName + ":" + columnMetadata.toString(); BufferedDirectBufInputStream reader = - new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset, - totalByteSize, BUFSZ, true); + new BufferedDirectBufInputStream(inputStream, allocator, streamId, startOffset, totalByteSize, + BUFSZ, true); reader.init(); while (true) { try { @@ -457,4 +455,5 @@ public static void main(String[] args) { allocator.close(); return; } + */ } diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf index 60dcf1500ad..deb31b34e88 100644 --- a/exec/java-exec/src/main/resources/drill-module.conf +++ b/exec/java-exec/src/main/resources/drill-module.conf @@ -194,6 +194,10 @@ drill.exec: { }, debug: { return_error_for_failure_in_cancelled_fragments: false + }, + scan: { + threadpool_size: 8, + decode_threadpool_size: 1 } udf: { retry-attempts: 5, diff --git a/exec/java-exec/src/main/resources/rest/profile/profile.ftl b/exec/java-exec/src/main/resources/rest/profile/profile.ftl index ef971fbb50b..c0f2d8e588a 100644 --- a/exec/java-exec/src/main/resources/rest/profile/profile.ftl +++ b/exec/java-exec/src/main/resources/rest/profile/profile.ftl @@ -221,7 +221,7 @@

-
+
${op.getMetricsTable()}