From ee3489ce3b6e5ad53e5c3a59b6e2e4e50773c630 Mon Sep 17 00:00:00 2001 From: Parth Chandra Date: Tue, 13 Sep 2016 21:47:49 -0700 Subject: [PATCH] DRILL-4800: Various fixes. Fix buffer underflow exception in BufferedDirectBufInputStream. Fix writer index for in64 dictionary encoded types. Added logging to help debug. Fix memory leaks. Work around issues with of InputStream.available() ( Do not use hasRemainder; Remove check for EOF in BufferedDirectBufInputStream.read() ). Finalize defaults. Remove commented code. Addressed review comments This closes #611 --- .../java/org/apache/drill/test/DrillTest.java | 2 +- .../drill/exec/ops/OperatorContextImpl.java | 3 + .../drill/exec/server/BootStrapContext.java | 7 +- .../columnreaders/AsyncPageReader.java | 71 +++++++------- .../parquet/columnreaders/ColumnReader.java | 61 +++--------- .../NullableFixedByteAlignedReaders.java | 3 + .../parquet/columnreaders/PageReader.java | 16 +-- .../columnreaders/ParquetRecordReader.java | 20 ++-- .../columnreaders/VarLenBinaryReader.java | 86 +++------------- .../BufferedDirectBufInputStream.java | 97 ++++++++++--------- .../util/filereader/DirectBufInputStream.java | 21 +++- .../impl/writer/TestParquetWriter.java | 3 +- 12 files changed, 165 insertions(+), 225 deletions(-) diff --git a/common/src/test/java/org/apache/drill/test/DrillTest.java b/common/src/test/java/org/apache/drill/test/DrillTest.java index ccc297df3df..18c2c1a2ef7 100644 --- a/common/src/test/java/org/apache/drill/test/DrillTest.java +++ b/common/src/test/java/org/apache/drill/test/DrillTest.java @@ -55,7 +55,7 @@ public class DrillTest { static MemWatcher memWatcher; static String className; - @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000*10000); + @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000); @Rule public final TestLogReporter logOutcome = LOG_OUTCOME; @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java index 38ddd166812..390b71c2bc4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java @@ -192,6 +192,9 @@ public DrillFileSystem newFileSystem(Configuration conf) throws IOException { } @Override + /* + Creates a DrillFileSystem that does not automatically track operator stats. + */ public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); fs = new DrillFileSystem(conf, null); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java index adb6323bb54..c498185046a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/BootStrapContext.java @@ -53,8 +53,7 @@ public BootStrapContext(DrillConfig config, ScanResult classpathScan) { this.config = config; this.classpathScan = classpathScan; this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-"); - this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), - "BitClient-"); + this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-"); // Note that metrics are stored in a static instance this.metrics = DrillMetrics.getRegistry(); this.allocator = RootAllocatorFactory.newRoot(config); @@ -79,8 +78,8 @@ protected void afterExecute(final Runnable r, final Throwable t) { final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE); final int scanThreadPoolSize = MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads; - final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores : numScanDecodeThreads; - + final int scanDecodeThreadPoolSize = + (numCores + 1) / 2 > numScanDecodeThreads ? (numCores + 1) / 2 : numScanDecodeThreads; this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-")); this.scanDecodeExecutor = Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-")); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java index b2bdef3d457..e2ba865d650 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/AsyncPageReader.java @@ -63,10 +63,12 @@ class AsyncPageReader extends PageReader { asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); } - @Override protected void loadDictionaryIfExists(final ColumnReader parentStatus, + @Override + protected void loadDictionaryIfExists(final ColumnReader parentStatus, final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { if (columnChunkMetaData.getDictionaryPageOffset() > 0) { try { + assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() ); dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); } catch (IOException e) { handleAndThrowException(e, "Error Reading dictionary page."); @@ -90,12 +92,12 @@ private DrillBuf getDecompressedPageData(ReadStatus readStatus) { isDictionary = readStatus.isDictionaryPage; } if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { - DrillBuf uncompressedData = data; - data = decompress(readStatus.getPageHeader(), uncompressedData); + DrillBuf compressedData = data; + data = decompress(readStatus.getPageHeader(), compressedData); synchronized (this) { readStatus.setPageData(null); } - uncompressedData.release(); + compressedData.release(); } else { if (isDictionary) { stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead); @@ -160,23 +162,12 @@ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) { pageDataBuf = allocateTemporaryBuffer(uncompressedSize); try { timer.start(); - if (logger.isTraceEnabled()) { - logger.trace("Decompress (1)==> Col: {} readPos: {} compressed_size: {} compressedPageData: {}", - parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(), - pageHeader.getCompressed_page_size(), ByteBufUtil.hexDump(compressedData)); - } CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec(); ByteBuffer input = compressedData.nioBuffer(0, compressedSize); ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize); DecompressionHelper decompressionHelper = new DecompressionHelper(codecName); decompressionHelper.decompress(input, compressedSize, output, uncompressedSize); pageDataBuf.writerIndex(uncompressedSize); - if (logger.isTraceEnabled()) { - logger.trace( - "Decompress (2)==> Col: {} readPos: {} uncompressed_size: {} uncompressedPageData: {}", - parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(), - pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageDataBuf)); - } timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize); } catch (IOException e) { @@ -219,30 +210,23 @@ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) { } } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); - if (dataReader.hasRemainder() && parentColumnReader.totalValuesRead + readStatus.getValuesRead() + if (parentColumnReader.totalValuesRead + readStatus.getValuesRead() < parentColumnReader.columnChunkMetaData.getValueCount()) { asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); } pageHeader = readStatus.getPageHeader(); pageData = getDecompressedPageData(readStatus); - if (logger.isTraceEnabled()) { - logger.trace("AsyncPageReader: Col: {} pageData: {}", - this.parentColumnReader.columnChunkMetaData.toString(), ByteBufUtil.hexDump(pageData)); - logger.trace("AsyncPageReaderTask==> Col: {} readPos: {} Uncompressed_size: {} pageData: {}", - parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(), - pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData)); - } + } @Override public void clear() { if (asyncPageRead != null) { - asyncPageRead.cancel(true); try { - ReadStatus r = asyncPageRead.get(); - r.getPageData().release(); + final ReadStatus readStatus = asyncPageRead.get(); + readStatus.getPageData().release(); } catch (Exception e) { // Do nothing. } @@ -319,7 +303,8 @@ public AsyncPageReaderTask() { ReadStatus readStatus = new ReadStatus(); String oldname = Thread.currentThread().getName(); - Thread.currentThread().setName(parent.parentColumnReader.columnChunkMetaData.toString()); + String name = parent.parentColumnReader.columnChunkMetaData.toString(); + Thread.currentThread().setName(name); long bytesRead = 0; long valuesRead = 0; @@ -327,10 +312,28 @@ public AsyncPageReaderTask() { DrillBuf pageData = null; try { + long s = parent.dataReader.getPos(); PageHeader pageHeader = Util.readPageHeader(parent.dataReader); + long e = parent.dataReader.getPos(); + if (logger.isTraceEnabled()) { + logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name, s, e - s); + } int compressedSize = pageHeader.getCompressed_page_size(); + s = parent.dataReader.getPos(); pageData = parent.dataReader.getNext(compressedSize); + e = parent.dataReader.getPos(); bytesRead = compressedSize; + + if (logger.isTraceEnabled()) { + DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize); + int endOffset = compressedSize>100?compressedSize-100:0; + DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset); + logger + .trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start = {} : Buf End = {} ", + name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd)); + + } + synchronized (parent) { if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { readStatus.setIsDictionaryPage(true); @@ -353,10 +356,6 @@ public AsyncPageReaderTask() { throw e; } Thread.currentThread().setName(oldname); - if(logger.isTraceEnabled()) { - logger.trace("AsyncPageReaderTask==> Col: {} readPos: {} bytesRead: {} pageData: {}", parent.parentColumnReader.columnChunkMetaData.toString(), - parent.dataReader.getPos(), bytesRead, ByteBufUtil.hexDump(pageData)); - } return readStatus; } @@ -365,7 +364,7 @@ public AsyncPageReaderTask() { private class DecompressionHelper { final CompressionCodecName codecName; - public DecompressionHelper(CompressionCodecName codecName){ + public DecompressionHelper(CompressionCodecName codecName) { this.codecName = codecName; } @@ -376,6 +375,7 @@ public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output, // expensive copying. if (codecName == CompressionCodecName.GZIP) { GzipCodec codec = new GzipCodec(); + // DirectDecompressor: @see https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/compress/DirectDecompressor.html DirectDecompressor directDecompressor = codec.createDirectDecompressor(); if (directDecompressor != null) { logger.debug("Using GZIP direct decompressor."); @@ -394,9 +394,10 @@ public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output, output.put(outputBytes); } } else if (codecName == CompressionCodecName.SNAPPY) { - // For Snappy, just call the Snappy decompressor directly. - // It is thread safe. The Hadoop layers though, appear to be - // not quite reliable in a multithreaded environment + // For Snappy, just call the Snappy decompressor directly instead + // of going thru the DirectDecompressor class. + // The Snappy codec is itself thread safe, while going thru the DirectDecompressor path + // seems to have concurrency issues. output.clear(); int size = Snappy.uncompress(input, output); output.limit(size); diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index 29e23bc3876..73cbc3dbb47 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -163,22 +163,13 @@ public void readValues(long recordsToRead) { protected abstract void readField(long recordsToRead); - /* - public Future determineSizeAsync(long recordsReadInCurrentPass, - Integer lengthVarFieldsInCurrentRecord) throws IOException { - Future r = threadPool.submit( - new ColumnReaderDetermineSizeTask(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord)); - return r; - } - */ - /** * Determines the size of a single value in a variable column. * * Return value indicates if we have finished a row group and should stop reading * * @param recordsReadInCurrentPass - * @ param lengthVarFieldsInCurrentRecord + * @param lengthVarFieldsInCurrentRecord * @return - true if we should stop reading * @throws IOException */ @@ -194,7 +185,7 @@ public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFie return true; } - //lengthVarFieldsInCurrentRecord += dataTypeLengthInBits; + // Never used in this code path. Hard to remove because the method is overidden by subclasses lengthVarFieldsInCurrentRecord = -1; doneReading = checkVectorCapacityReached(); @@ -307,41 +298,18 @@ public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){ @Override public Long call() throws IOException{ String oldname = Thread.currentThread().getName(); - Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString()); - - this.parent.processPages(recordsToReadInThisPass); - - Thread.currentThread().setName(oldname); - return recordsToReadInThisPass; - } - - } - - /* - private class ColumnReaderDetermineSizeTask implements Callable { - - private final ColumnReader parent = ColumnReader.this; - private final long recordsReadInCurrentPass; - private final Integer lengthVarFieldsInCurrentRecord; - - public ColumnReaderDetermineSizeTask(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord){ - this.recordsReadInCurrentPass = recordsReadInCurrentPass; - this.lengthVarFieldsInCurrentRecord = lengthVarFieldsInCurrentRecord; - } - - @Override public Boolean call() throws IOException{ - - String oldname = Thread.currentThread().getName(); - Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString()); + try { + Thread.currentThread().setName(oldname + "Decode-" + this.parent.columnChunkMetaData.toString()); - boolean b = this.parent.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); + this.parent.processPages(recordsToReadInThisPass); + return recordsToReadInThisPass; - Thread.currentThread().setName(oldname); - return b; + } finally { + Thread.currentThread().setName(oldname); + } } } - */ private class ColumnReaderReadRecordsTask implements Callable { @@ -355,12 +323,15 @@ public ColumnReaderReadRecordsTask(int recordsToRead){ @Override public Integer call() throws IOException{ String oldname = Thread.currentThread().getName(); - Thread.currentThread().setName("Decode-"+this.parent.columnChunkMetaData.toString()); + try { + Thread.currentThread().setName("Decode-" + this.parent.columnChunkMetaData.toString()); - this.parent.readRecords(recordsToRead); + this.parent.readRecords(recordsToRead); + return recordsToRead; - Thread.currentThread().setName(oldname); - return recordsToRead; + } finally { + Thread.currentThread().setName(oldname); + } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index f4fe5ee8da1..e20504f3daa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -150,7 +150,10 @@ protected void readField(long recordsToReadInThisPass) { for (int i = 0; i < recordsToReadInThisPass; i++){ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); } + int writerIndex = castedBaseVector.getBuffer().writerIndex(); + castedBaseVector.getBuffer().setIndex(0, writerIndex + (int)readLength); } else { + for (int i = 0; i < recordsToReadInThisPass; i++){ valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 0736f01870d..f71eeae646f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store.parquet.columnreaders; import com.google.common.base.Stopwatch; +import io.netty.buffer.ByteBufUtil; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream; import io.netty.buffer.ByteBuf; @@ -63,9 +64,8 @@ class PageReader { public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter; protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader parentColumnReader; - //private final ColumnDataReader dataReader; protected final DirectBufInputStream dataReader; - //der; buffer to store bytes of current page + //buffer to store bytes of current page protected DrillBuf pageData; // for variable length data we need to keep track of our current position in the page data @@ -189,6 +189,11 @@ private DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompr if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) { timer.start(); pageDataBuf = dataReader.getNext(compressedSize); + if (logger.isTraceEnabled()) { + logger.trace("PageReaderTask==> Col: {} readPos: {} Uncompressed_size: {} pageData: {}", + parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(), + pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData)); + } timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize); } else { @@ -247,9 +252,6 @@ protected void nextInternal() throws IOException{ } } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); - //TODO: Handle buffer allocation exception - - //allocatePageData(pageHeader.getUncompressed_page_size()); int compressedSize = pageHeader.getCompressed_page_size(); int uncompressedSize = pageHeader.getUncompressed_page_size(); pageData = readPage(pageHeader, compressedSize, uncompressedSize); @@ -270,7 +272,7 @@ public boolean next() throws IOException { // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause // and submit a bug report - if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { + if(parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { return false; } clearBuffers(); @@ -395,7 +397,7 @@ protected void clearDictionaryBuffers() { public void clear(){ try { - this.inputStream.close(); + // data reader also owns the input stream and will close it. this.dataReader.close(); } catch (IOException e) { //Swallow the exception which is OK for input streams diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 4f0e3b5a205..69f6a62800a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -476,11 +476,11 @@ public void readAllFixedFields(long recordsToRead) throws IOException { if(useAsyncColReader){ readAllFixedFieldsParallel(recordsToRead) ; } else { - readAllFixedFieldsiSerial(recordsToRead); ; + readAllFixedFieldsSerial(recordsToRead); ; } } - public void readAllFixedFieldsiSerial(long recordsToRead) throws IOException { + public void readAllFixedFieldsSerial(long recordsToRead) throws IOException { for (ColumnReader crs : columnStatuses) { crs.processPages(recordsToRead); } @@ -492,14 +492,22 @@ public void readAllFixedFieldsParallel(long recordsToRead) throws IOException { Future f = crs.processPagesAsync(recordsToRead); futures.add(f); } + Exception exception = null; for(Future f: futures){ - try { - f.get(); - } catch (Exception e) { + if(exception != null) { f.cancel(true); - handleAndRaise(null, e); + } else { + try { + f.get(); + } catch (Exception e) { + f.cancel(true); + exception = e; + } } } + if(exception != null){ + handleAndRaise(null, exception); + } } @Override diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java index c78dc7a1016..7bcce11bd2c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLenBinaryReader.java @@ -57,11 +57,7 @@ public long readFields(long recordsToReadInThisPass, ColumnReader firstColumn columnReader.reset(); } - //if(useAsyncTasks){ - // recordsReadInCurrentPass = determineSizesParallel(recordsToReadInThisPass); - //} else { - recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); - //} + recordsReadInCurrentPass = determineSizesSerial(recordsToReadInThisPass); if(useAsyncTasks){ readRecordsParallel(recordsReadInCurrentPass); }else{ @@ -102,71 +98,6 @@ private long determineSizesSerial(long recordsToReadInThisPass) throws IOExcepti return recordsReadInCurrentPass; } - - public long determineSizesParallel(long recordsToReadInThisPass ) throws IOException { - boolean doneReading = false; - int lengthVarFieldsInCurrentRecord = 0; - boolean exitLengthDeterminingLoop = false; - long totalVariableLengthData = 0; - long recordsReadInCurrentPass = 0; - - do { - doneReading = readPagesParallel(); - - if (!doneReading) { - lengthVarFieldsInCurrentRecord = 0; - for (VarLengthColumn columnReader : columns) { - doneReading = columnReader.processPageData((int) recordsReadInCurrentPass); - if(doneReading) { - break; - } - lengthVarFieldsInCurrentRecord += columnReader.dataTypeLengthInBits; - doneReading = columnReader.checkVectorCapacityReached(); - if(doneReading) { - break; - } - } - } - - exitLengthDeterminingLoop = doneReading; - - // check that the next record will fit in the batch - if (exitLengthDeterminingLoop || - (recordsReadInCurrentPass + 1) * parentReader.getBitWidthAllFixedFields() - + totalVariableLengthData + lengthVarFieldsInCurrentRecord > parentReader.getBatchSize()) { - break; - } - for (VarLengthColumn columnReader : columns) { - columnReader.updateReadyToReadPosition(); - columnReader.currDefLevel = -1; - } - recordsReadInCurrentPass++; - totalVariableLengthData += lengthVarFieldsInCurrentRecord; - } while (recordsReadInCurrentPass < recordsToReadInThisPass); - - return recordsReadInCurrentPass; - } - - public boolean readPagesParallel() { - - boolean isDone = false; - ArrayList> futures = Lists.newArrayList(); - for (VarLengthColumn columnReader : columns) { - Future f = columnReader.readPageAsync(); - futures.add(f); - } - for (Future f : futures) { - try { - isDone = isDone || f.get().booleanValue(); - } catch (Exception e) { - f.cancel(true); - handleAndRaise(null, e); - } - } - return isDone; - } - - private void readRecordsSerial(long recordsReadInCurrentPass) { for (VarLengthColumn columnReader : columns) { columnReader.readRecords(columnReader.pageReader.valuesReadyToRead); @@ -182,12 +113,17 @@ private void readRecordsParallel(long recordsReadInCurrentPass){ Future f = columnReader.readRecordsAsync(columnReader.pageReader.valuesReadyToRead); futures.add(f); } - for (Future f : futures) { - try { - f.get(); - } catch (Exception e) { + Exception exception = null; + for(Future f: futures){ + if(exception != null) { f.cancel(true); - handleAndRaise(null, e); + } else { + try { + f.get(); + } catch (Exception e) { + f.cancel(true); + exception = e; + } } } for (VarLengthColumn columnReader : columns) { diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java index a5a6b8179d5..327c9a168cf 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/BufferedDirectBufInputStream.java @@ -19,24 +19,13 @@ import com.google.common.base.Preconditions; import io.netty.buffer.DrillBuf; -import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.memory.BufferAllocator; -import org.apache.drill.exec.memory.RootAllocatorFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.hadoop.Footer; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.BlockMetaData; -import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.util.CompatibilityUtil; import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import java.util.List; /** * BufferedDirectBufInputStream reads from the @@ -52,8 +41,9 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BufferedDirectBufInputStream.class); - private static int defaultBufferSize = 8192 * 1024; // 8 MiB - private static int defaultTempBufferSize = 8192; // 8 KiB + private static final int DEFAULT_BUFFER_SIZE = 8192 * 1024; // 8 MiB + private static final int DEFAULT_TEMP_BUFFER_SIZE = 8192; // 8 KiB + private static final int SMALL_BUFFER_SIZE = 64 * 1024; // 64 KiB /** * The internal buffer to keep data read from the underlying inputStream. @@ -82,11 +72,10 @@ public class BufferedDirectBufInputStream extends DirectBufInputStream implement protected long curPosInStream; // current offset in the input stream - private final int bufSize; + private int bufSize; private volatile DrillBuf tempBuffer; // a temp Buffer for use by read(byte[] buf, int off, int len) - private DrillBuf getBuf() throws IOException { checkInputStreamState(); if (internalBuffer == null) { @@ -101,7 +90,7 @@ private DrillBuf getBuf() throws IOException { */ public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, String id, long startOffset, long totalByteSize, boolean enableHints) { - this(in, allocator, id, startOffset, totalByteSize, defaultBufferSize, enableHints); + this(in, allocator, id, startOffset, totalByteSize, DEFAULT_BUFFER_SIZE, enableHints); } /** @@ -130,13 +119,21 @@ public BufferedDirectBufInputStream(InputStream in, BufferAllocator allocator, S @Override public void init() throws UnsupportedOperationException, IOException { super.init(); this.internalBuffer = this.allocator.buffer(this.bufSize); - this.tempBuffer = this.allocator.buffer(defaultTempBufferSize); + this.tempBuffer = this.allocator.buffer(DEFAULT_TEMP_BUFFER_SIZE); int bytesRead = getNextBlock(); if (bytesRead <= 0) { throw new IOException("End of stream reached while initializing buffered reader."); } } + private DrillBuf reallocBuffer(int newSize ){ + this.internalBuffer.release(); + this.bufSize = newSize; + this.internalBuffer = this.allocator.buffer(this.bufSize); + logger.debug("Internal buffer resized to {}", newSize); + return this.internalBuffer; + } + /** * Read one more block from the underlying stream. * Assumes we have reached the end of buffered data @@ -152,11 +149,14 @@ private int getNextBlock() throws IOException { this.count = this.curPosInBuffer = 0; // We *cannot* rely on the totalByteSize being correct because - // metadata for Parquet files is incorrect. So we read as - // much as we can up to the size of the buffer - //int bytesToRead = buffer.capacity() <= (totalByteSize + startOffset - curPosInStream ) ? - // buffer.Capacity() : - // (int) (totalByteSize + startOffset - curPosInStream ); + // metadata for Parquet files is incorrect (sometimes). So we read + // beyond the totalByteSize parameter. However, to prevent ourselves from reading too + // much data, we reduce the size of the buffer, down to 64KiB. + if (buffer.capacity() >= (totalByteSize + startOffset - curPosInStream)) { + if (buffer.capacity() > SMALL_BUFFER_SIZE) { + buffer = this.reallocBuffer(SMALL_BUFFER_SIZE); + } + } int bytesToRead = buffer.capacity(); ByteBuffer directBuffer = buffer.nioBuffer(curPosInBuffer, bytesToRead); @@ -171,6 +171,7 @@ private int getNextBlock() throws IOException { nBytes = CompatibilityUtil.getBuf(getInputStream(), directBuffer, bytesToRead); } catch (Exception e) { logger.error("Error reading from stream {}. Error was : {}", this.streamId, e.getMessage()); + throw new IOException((e)); } if (nBytes > 0) { buffer.writerIndex(nBytes); @@ -269,12 +270,13 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException } } else { bytesRead += nRead; + //TODO: Uncomment this when the InputStream.available() call is fixed. // If the last read caused us to reach the end of stream - // we are done - InputStream input = in; - if (input != null && input.available() <= 0) { - return bytesRead; - } + // we are done. + //InputStream input = in; + //if (input != null && input.available() <= 0) { + // return bytesRead; + //} } } while (bytesRead < len); return bytesRead; @@ -294,7 +296,7 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException return 0; } DrillBuf byteBuf; - if (len <= defaultTempBufferSize) { + if (len <= DEFAULT_TEMP_BUFFER_SIZE) { byteBuf = tempBuffer; } else { byteBuf = this.allocator.buffer(len); @@ -310,13 +312,13 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException return bytesRead; } } else { - byteBuf.nioBuffer().get(buf, off + bytesRead, len - bytesRead); + byteBuf.nioBuffer().get(buf, off + bytesRead, nRead); byteBuf.clear(); bytesRead += nRead; } } while (bytesRead < len); - if (len > defaultTempBufferSize) { + if (len > DEFAULT_TEMP_BUFFER_SIZE) { byteBuf.release(); } @@ -380,27 +382,26 @@ public long getPos() throws IOException { return curPosInBuffer + startOffset; } - public boolean hasRemainder() throws IOException { - return available() > 0; - } - public void close() throws IOException { DrillBuf buffer; InputStream inp; - if ((inp = in) != null) { - in = null; - inp.close(); - } - if ((buffer = this.internalBuffer) != null) { - synchronized (this) { - this.internalBuffer = null; - buffer.release(); - } - } - if ((buffer = this.tempBuffer) != null) { - synchronized (this) { - this.tempBuffer = null; - buffer.release(); + synchronized (this) { + try { + if ((inp = in) != null) { + in = null; + inp.close(); + } + } catch (IOException e) { + throw e; + } finally { + if ((buffer = this.internalBuffer) != null) { + this.internalBuffer = null; + buffer.release(); + } + if ((buffer = this.tempBuffer) != null) { + this.tempBuffer = null; + buffer.release(); + } } } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java index 71c36e62a2f..f11ad1f18e9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/filereader/DirectBufInputStream.java @@ -89,7 +89,13 @@ public synchronized int read(DrillBuf buf, int off, int len) throws IOException public synchronized DrillBuf getNext(int bytes) throws IOException { DrillBuf b = allocator.buffer(bytes); - int bytesRead = read(b, 0, bytes); + int bytesRead = -1; + try { + bytesRead = read(b, 0, bytes); + } catch (IOException e){ + b.release(); + throw e; + } if (bytesRead <= -1) { b.release(); return null; @@ -102,7 +108,10 @@ public long getPos() throws IOException { } public boolean hasRemainder() throws IOException { - return getInputStream().available() > 0; + // We use the following instead of "getInputStream.available() > 0" because + // available() on HDFS seems to have issues with file sizes + // that are greater than Integer.MAX_VALUE + return (this.getPos() < (this.startOffset + this.totalByteSize)); } protected FSDataInputStream getInputStream() throws IOException { @@ -117,6 +126,14 @@ protected void checkInputStreamState() throws IOException { } } + public synchronized void close() throws IOException { + InputStream inp; + if ((inp = in) != null) { + in = null; + inp.close(); + } + } + protected void checkStreamSupportsByteBuffer() throws UnsupportedOperationException { // Check input stream supports ByteBuffer if (!(in instanceof ByteBufferReadable)) { diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 56b94d7daee..ae0e699411b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -929,6 +929,7 @@ private void compareParquetInt96Converters(String selection, String table) throw } } + @Ignore ("Used to test decompression in AsyncPageReader. Takes too long.") @Test public void testTPCHReadWriteRunRepeated() throws Exception { for (int i = 1; i <= repeat; i++) { @@ -945,7 +946,6 @@ public void testTPCHReadWriteGzip() throws Exception { try { test(String.format("alter session set `%s` = 'gzip'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)); String inputTable = "cp.`tpch/supplier.parquet`"; -// runTestAndValidate("s_suppkey, s_nationkey, s_acctbal", "s_suppkey, s_nationkey, s_acctbal", inputTable, "suppkey_parquet_dict_gzip"); runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_gzip"); } finally { test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR.getDefault().string_val)); @@ -957,7 +957,6 @@ public void testTPCHReadWriteSnappy() throws Exception { try { test(String.format("alter session set `%s` = 'snappy'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE)); String inputTable = "cp.`supplier_snappy.parquet`"; - // runTestAndValidate("s_suppkey, s_nationkey, s_acctbal", "s_suppkey, s_nationkey, s_acctbal", inputTable, "suppkey_parquet_dict_gzip"); runTestAndValidate("*", "*", inputTable, "suppkey_parquet_dict_snappy"); } finally { test(String.format("alter session set `%s` = '%s'", ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE, ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE_VALIDATOR.getDefault().string_val));