Skip to content

Commit

Permalink
DRILL-4800: Various fixes. Fix buffer underflow exception in Buffered…
Browse files Browse the repository at this point in the history
…DirectBufInputStream. Fix writer index for in64 dictionary encoded types. Added logging to help debug. Fix memory leaks. Work around issues with of InputStream.available() ( Do not use hasRemainder; Remove check for EOF in BufferedDirectBufInputStream.read() ). Finalize defaults. Remove commented code.

Addressed review comments

This closes #611
  • Loading branch information
parthchandra committed Nov 4, 2016
1 parent 7f5acf8 commit ee3489c
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 225 deletions.
2 changes: 1 addition & 1 deletion common/src/test/java/org/apache/drill/test/DrillTest.java
Expand Up @@ -55,7 +55,7 @@ public class DrillTest {
static MemWatcher memWatcher; static MemWatcher memWatcher;
static String className; static String className;


@Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000*10000); @Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(50000);
@Rule public final TestLogReporter logOutcome = LOG_OUTCOME; @Rule public final TestLogReporter logOutcome = LOG_OUTCOME;


@Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false); @Rule public final TestRule REPEAT_RULE = TestTools.getRepeatRule(false);
Expand Down
Expand Up @@ -192,6 +192,9 @@ public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
} }


@Override @Override
/*
Creates a DrillFileSystem that does not automatically track operator stats.
*/
public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException { public DrillFileSystem newNonTrackingFileSystem(Configuration conf) throws IOException {
Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext"); Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
fs = new DrillFileSystem(conf, null); fs = new DrillFileSystem(conf, null);
Expand Down
Expand Up @@ -53,8 +53,7 @@ public BootStrapContext(DrillConfig config, ScanResult classpathScan) {
this.config = config; this.config = config;
this.classpathScan = classpathScan; this.classpathScan = classpathScan;
this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-"); this.loop = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitServer-");
this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), this.loop2 = TransportCheck.createEventLoopGroup(config.getInt(ExecConstants.BIT_SERVER_RPC_THREADS), "BitClient-");
"BitClient-");
// Note that metrics are stored in a static instance // Note that metrics are stored in a static instance
this.metrics = DrillMetrics.getRegistry(); this.metrics = DrillMetrics.getRegistry();
this.allocator = RootAllocatorFactory.newRoot(config); this.allocator = RootAllocatorFactory.newRoot(config);
Expand All @@ -79,8 +78,8 @@ protected void afterExecute(final Runnable r, final Throwable t) {
final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE); final int numScanDecodeThreads = (int) config.getDouble(ExecConstants.SCAN_DECODE_THREADPOOL_SIZE);
final int scanThreadPoolSize = final int scanThreadPoolSize =
MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads; MIN_SCAN_THREADPOOL_SIZE > numScanThreads ? MIN_SCAN_THREADPOOL_SIZE : numScanThreads;
final int scanDecodeThreadPoolSize = numCores > numScanDecodeThreads ? numCores : numScanDecodeThreads; final int scanDecodeThreadPoolSize =

(numCores + 1) / 2 > numScanDecodeThreads ? (numCores + 1) / 2 : numScanDecodeThreads;
this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-")); this.scanExecutor = Executors.newFixedThreadPool(scanThreadPoolSize, new NamedThreadFactory("scan-"));
this.scanDecodeExecutor = this.scanDecodeExecutor =
Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-")); Executors.newFixedThreadPool(scanDecodeThreadPoolSize, new NamedThreadFactory("scan-decode-"));
Expand Down
Expand Up @@ -63,10 +63,12 @@ class AsyncPageReader extends PageReader {
asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
} }


@Override protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus, @Override
protected void loadDictionaryIfExists(final ColumnReader<?> parentStatus,
final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException { final ColumnChunkMetaData columnChunkMetaData, final DirectBufInputStream f) throws UserException {
if (columnChunkMetaData.getDictionaryPageOffset() > 0) { if (columnChunkMetaData.getDictionaryPageOffset() > 0) {
try { try {
assert(columnChunkMetaData.getDictionaryPageOffset() >= dataReader.getPos() );
dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos()); dataReader.skip(columnChunkMetaData.getDictionaryPageOffset() - dataReader.getPos());
} catch (IOException e) { } catch (IOException e) {
handleAndThrowException(e, "Error Reading dictionary page."); handleAndThrowException(e, "Error Reading dictionary page.");
Expand All @@ -90,12 +92,12 @@ private DrillBuf getDecompressedPageData(ReadStatus readStatus) {
isDictionary = readStatus.isDictionaryPage; isDictionary = readStatus.isDictionaryPage;
} }
if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) { if (parentColumnReader.columnChunkMetaData.getCodec() != CompressionCodecName.UNCOMPRESSED) {
DrillBuf uncompressedData = data; DrillBuf compressedData = data;
data = decompress(readStatus.getPageHeader(), uncompressedData); data = decompress(readStatus.getPageHeader(), compressedData);
synchronized (this) { synchronized (this) {
readStatus.setPageData(null); readStatus.setPageData(null);
} }
uncompressedData.release(); compressedData.release();
} else { } else {
if (isDictionary) { if (isDictionary) {
stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead); stats.totalDictPageReadBytes.addAndGet(readStatus.bytesRead);
Expand Down Expand Up @@ -160,23 +162,12 @@ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
pageDataBuf = allocateTemporaryBuffer(uncompressedSize); pageDataBuf = allocateTemporaryBuffer(uncompressedSize);
try { try {
timer.start(); timer.start();
if (logger.isTraceEnabled()) {
logger.trace("Decompress (1)==> Col: {} readPos: {} compressed_size: {} compressedPageData: {}",
parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
pageHeader.getCompressed_page_size(), ByteBufUtil.hexDump(compressedData));
}
CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec(); CompressionCodecName codecName = parentColumnReader.columnChunkMetaData.getCodec();
ByteBuffer input = compressedData.nioBuffer(0, compressedSize); ByteBuffer input = compressedData.nioBuffer(0, compressedSize);
ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize); ByteBuffer output = pageDataBuf.nioBuffer(0, uncompressedSize);
DecompressionHelper decompressionHelper = new DecompressionHelper(codecName); DecompressionHelper decompressionHelper = new DecompressionHelper(codecName);
decompressionHelper.decompress(input, compressedSize, output, uncompressedSize); decompressionHelper.decompress(input, compressedSize, output, uncompressedSize);
pageDataBuf.writerIndex(uncompressedSize); pageDataBuf.writerIndex(uncompressedSize);
if (logger.isTraceEnabled()) {
logger.trace(
"Decompress (2)==> Col: {} readPos: {} uncompressed_size: {} uncompressedPageData: {}",
parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageDataBuf));
}
timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize); this.updateStats(pageHeader, "Decompress", 0, timeToRead, compressedSize, uncompressedSize);
} catch (IOException e) { } catch (IOException e) {
Expand Down Expand Up @@ -219,30 +210,23 @@ private DrillBuf decompress(PageHeader pageHeader, DrillBuf compressedData) {
} }
} while (pageHeader.getType() == PageType.DICTIONARY_PAGE); } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);


if (dataReader.hasRemainder() && parentColumnReader.totalValuesRead + readStatus.getValuesRead() if (parentColumnReader.totalValuesRead + readStatus.getValuesRead()
< parentColumnReader.columnChunkMetaData.getValueCount()) { < parentColumnReader.columnChunkMetaData.getValueCount()) {
asyncPageRead = threadPool.submit(new AsyncPageReaderTask()); asyncPageRead = threadPool.submit(new AsyncPageReaderTask());
} }


pageHeader = readStatus.getPageHeader(); pageHeader = readStatus.getPageHeader();
pageData = getDecompressedPageData(readStatus); pageData = getDecompressedPageData(readStatus);
if (logger.isTraceEnabled()) {
logger.trace("AsyncPageReader: Col: {} pageData: {}",
this.parentColumnReader.columnChunkMetaData.toString(), ByteBufUtil.hexDump(pageData));
logger.trace("AsyncPageReaderTask==> Col: {} readPos: {} Uncompressed_size: {} pageData: {}",
parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData));
}


} }




@Override public void clear() { @Override public void clear() {
if (asyncPageRead != null) { if (asyncPageRead != null) {
asyncPageRead.cancel(true);
try { try {
ReadStatus r = asyncPageRead.get(); final ReadStatus readStatus = asyncPageRead.get();
r.getPageData().release(); readStatus.getPageData().release();
} catch (Exception e) { } catch (Exception e) {
// Do nothing. // Do nothing.
} }
Expand Down Expand Up @@ -319,18 +303,37 @@ public AsyncPageReaderTask() {
ReadStatus readStatus = new ReadStatus(); ReadStatus readStatus = new ReadStatus();


String oldname = Thread.currentThread().getName(); String oldname = Thread.currentThread().getName();
Thread.currentThread().setName(parent.parentColumnReader.columnChunkMetaData.toString()); String name = parent.parentColumnReader.columnChunkMetaData.toString();
Thread.currentThread().setName(name);


long bytesRead = 0; long bytesRead = 0;
long valuesRead = 0; long valuesRead = 0;
Stopwatch timer = Stopwatch.createStarted(); Stopwatch timer = Stopwatch.createStarted();


DrillBuf pageData = null; DrillBuf pageData = null;
try { try {
long s = parent.dataReader.getPos();
PageHeader pageHeader = Util.readPageHeader(parent.dataReader); PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
long e = parent.dataReader.getPos();
if (logger.isTraceEnabled()) {
logger.trace("[{}]: Read Page Header : ReadPos = {} : Bytes Read = {} ", name, s, e - s);
}
int compressedSize = pageHeader.getCompressed_page_size(); int compressedSize = pageHeader.getCompressed_page_size();
s = parent.dataReader.getPos();
pageData = parent.dataReader.getNext(compressedSize); pageData = parent.dataReader.getNext(compressedSize);
e = parent.dataReader.getPos();
bytesRead = compressedSize; bytesRead = compressedSize;

if (logger.isTraceEnabled()) {
DrillBuf bufStart = pageData.slice(0, compressedSize>100?100:compressedSize);
int endOffset = compressedSize>100?compressedSize-100:0;
DrillBuf bufEnd = pageData.slice(endOffset, compressedSize-endOffset);
logger
.trace("[{}]: Read Page Data : ReadPos = {} : Bytes Read = {} : Buf Start = {} : Buf End = {} ",
name, s, e - s, ByteBufUtil.hexDump(bufStart), ByteBufUtil.hexDump(bufEnd));

}

synchronized (parent) { synchronized (parent) {
if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
readStatus.setIsDictionaryPage(true); readStatus.setIsDictionaryPage(true);
Expand All @@ -353,10 +356,6 @@ public AsyncPageReaderTask() {
throw e; throw e;
} }
Thread.currentThread().setName(oldname); Thread.currentThread().setName(oldname);
if(logger.isTraceEnabled()) {
logger.trace("AsyncPageReaderTask==> Col: {} readPos: {} bytesRead: {} pageData: {}", parent.parentColumnReader.columnChunkMetaData.toString(),
parent.dataReader.getPos(), bytesRead, ByteBufUtil.hexDump(pageData));
}
return readStatus; return readStatus;
} }


Expand All @@ -365,7 +364,7 @@ public AsyncPageReaderTask() {
private class DecompressionHelper { private class DecompressionHelper {
final CompressionCodecName codecName; final CompressionCodecName codecName;


public DecompressionHelper(CompressionCodecName codecName){ public DecompressionHelper(CompressionCodecName codecName) {
this.codecName = codecName; this.codecName = codecName;
} }


Expand All @@ -376,6 +375,7 @@ public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output,
// expensive copying. // expensive copying.
if (codecName == CompressionCodecName.GZIP) { if (codecName == CompressionCodecName.GZIP) {
GzipCodec codec = new GzipCodec(); GzipCodec codec = new GzipCodec();
// DirectDecompressor: @see https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/io/compress/DirectDecompressor.html
DirectDecompressor directDecompressor = codec.createDirectDecompressor(); DirectDecompressor directDecompressor = codec.createDirectDecompressor();
if (directDecompressor != null) { if (directDecompressor != null) {
logger.debug("Using GZIP direct decompressor."); logger.debug("Using GZIP direct decompressor.");
Expand All @@ -394,9 +394,10 @@ public void decompress (ByteBuffer input, int compressedSize, ByteBuffer output,
output.put(outputBytes); output.put(outputBytes);
} }
} else if (codecName == CompressionCodecName.SNAPPY) { } else if (codecName == CompressionCodecName.SNAPPY) {
// For Snappy, just call the Snappy decompressor directly. // For Snappy, just call the Snappy decompressor directly instead
// It is thread safe. The Hadoop layers though, appear to be // of going thru the DirectDecompressor class.
// not quite reliable in a multithreaded environment // The Snappy codec is itself thread safe, while going thru the DirectDecompressor path
// seems to have concurrency issues.
output.clear(); output.clear();
int size = Snappy.uncompress(input, output); int size = Snappy.uncompress(input, output);
output.limit(size); output.limit(size);
Expand Down
Expand Up @@ -163,22 +163,13 @@ public void readValues(long recordsToRead) {


protected abstract void readField(long recordsToRead); protected abstract void readField(long recordsToRead);


/*
public Future<Boolean> determineSizeAsync(long recordsReadInCurrentPass,
Integer lengthVarFieldsInCurrentRecord) throws IOException {
Future<Boolean> r = threadPool.submit(
new ColumnReaderDetermineSizeTask(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord));
return r;
}
*/

/** /**
* Determines the size of a single value in a variable column. * Determines the size of a single value in a variable column.
* *
* Return value indicates if we have finished a row group and should stop reading * Return value indicates if we have finished a row group and should stop reading
* *
* @param recordsReadInCurrentPass * @param recordsReadInCurrentPass
* @ param lengthVarFieldsInCurrentRecord * @param lengthVarFieldsInCurrentRecord
* @return - true if we should stop reading * @return - true if we should stop reading
* @throws IOException * @throws IOException
*/ */
Expand All @@ -194,7 +185,7 @@ public boolean determineSize(long recordsReadInCurrentPass, Integer lengthVarFie
return true; return true;
} }


//lengthVarFieldsInCurrentRecord += dataTypeLengthInBits; // Never used in this code path. Hard to remove because the method is overidden by subclasses
lengthVarFieldsInCurrentRecord = -1; lengthVarFieldsInCurrentRecord = -1;


doneReading = checkVectorCapacityReached(); doneReading = checkVectorCapacityReached();
Expand Down Expand Up @@ -307,41 +298,18 @@ public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){
@Override public Long call() throws IOException{ @Override public Long call() throws IOException{


String oldname = Thread.currentThread().getName(); String oldname = Thread.currentThread().getName();
Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString()); try {

Thread.currentThread().setName(oldname + "Decode-" + this.parent.columnChunkMetaData.toString());
this.parent.processPages(recordsToReadInThisPass);

Thread.currentThread().setName(oldname);
return recordsToReadInThisPass;
}

}

/*
private class ColumnReaderDetermineSizeTask implements Callable<Boolean> {
private final ColumnReader parent = ColumnReader.this;
private final long recordsReadInCurrentPass;
private final Integer lengthVarFieldsInCurrentRecord;
public ColumnReaderDetermineSizeTask(long recordsReadInCurrentPass, Integer lengthVarFieldsInCurrentRecord){
this.recordsReadInCurrentPass = recordsReadInCurrentPass;
this.lengthVarFieldsInCurrentRecord = lengthVarFieldsInCurrentRecord;
}
@Override public Boolean call() throws IOException{
String oldname = Thread.currentThread().getName();
Thread.currentThread().setName(oldname+"Decode-"+this.parent.columnChunkMetaData.toString());


boolean b = this.parent.determineSize(recordsReadInCurrentPass, lengthVarFieldsInCurrentRecord); this.parent.processPages(recordsToReadInThisPass);
return recordsToReadInThisPass;


Thread.currentThread().setName(oldname); } finally {
return b; Thread.currentThread().setName(oldname);
}
} }


} }
*/


private class ColumnReaderReadRecordsTask implements Callable<Integer> { private class ColumnReaderReadRecordsTask implements Callable<Integer> {


Expand All @@ -355,12 +323,15 @@ public ColumnReaderReadRecordsTask(int recordsToRead){
@Override public Integer call() throws IOException{ @Override public Integer call() throws IOException{


String oldname = Thread.currentThread().getName(); String oldname = Thread.currentThread().getName();
Thread.currentThread().setName("Decode-"+this.parent.columnChunkMetaData.toString()); try {
Thread.currentThread().setName("Decode-" + this.parent.columnChunkMetaData.toString());


this.parent.readRecords(recordsToRead); this.parent.readRecords(recordsToRead);
return recordsToRead;


Thread.currentThread().setName(oldname); } finally {
return recordsToRead; Thread.currentThread().setName(oldname);
}
} }


} }
Expand Down
Expand Up @@ -150,7 +150,10 @@ protected void readField(long recordsToReadInThisPass) {
for (int i = 0; i < recordsToReadInThisPass; i++){ for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger()); valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.dictionaryValueReader.readInteger());
} }
int writerIndex = castedBaseVector.getBuffer().writerIndex();
castedBaseVector.getBuffer().setIndex(0, writerIndex + (int)readLength);
} else { } else {

for (int i = 0; i < recordsToReadInThisPass; i++){ for (int i = 0; i < recordsToReadInThisPass; i++){
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger()); valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, pageReader.valueReader.readInteger());
} }
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.drill.exec.store.parquet.columnreaders; package org.apache.drill.exec.store.parquet.columnreaders;


import com.google.common.base.Stopwatch; import com.google.common.base.Stopwatch;
import io.netty.buffer.ByteBufUtil;
import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream; import org.apache.drill.exec.util.filereader.BufferedDirectBufInputStream;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -63,9 +64,8 @@ class PageReader {
public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter; public static final ParquetMetadataConverter METADATA_CONVERTER = ParquetFormatPlugin.parquetMetadataConverter;


protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader; protected final org.apache.drill.exec.store.parquet.columnreaders.ColumnReader<?> parentColumnReader;
//private final ColumnDataReader dataReader;
protected final DirectBufInputStream dataReader; protected final DirectBufInputStream dataReader;
//der; buffer to store bytes of current page //buffer to store bytes of current page
protected DrillBuf pageData; protected DrillBuf pageData;


// for variable length data we need to keep track of our current position in the page data // for variable length data we need to keep track of our current position in the page data
Expand Down Expand Up @@ -189,6 +189,11 @@ private DrillBuf readPage(PageHeader pageHeader, int compressedSize, int uncompr
if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) { if (parentColumnReader.columnChunkMetaData.getCodec() == CompressionCodecName.UNCOMPRESSED) {
timer.start(); timer.start();
pageDataBuf = dataReader.getNext(compressedSize); pageDataBuf = dataReader.getNext(compressedSize);
if (logger.isTraceEnabled()) {
logger.trace("PageReaderTask==> Col: {} readPos: {} Uncompressed_size: {} pageData: {}",
parentColumnReader.columnChunkMetaData.toString(), dataReader.getPos(),
pageHeader.getUncompressed_page_size(), ByteBufUtil.hexDump(pageData));
}
timeToRead = timer.elapsed(TimeUnit.NANOSECONDS); timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize); this.updateStats(pageHeader, "Page Read", start, timeToRead, compressedSize, uncompressedSize);
} else { } else {
Expand Down Expand Up @@ -247,9 +252,6 @@ protected void nextInternal() throws IOException{
} }
} while (pageHeader.getType() == PageType.DICTIONARY_PAGE); } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);


//TODO: Handle buffer allocation exception

//allocatePageData(pageHeader.getUncompressed_page_size());
int compressedSize = pageHeader.getCompressed_page_size(); int compressedSize = pageHeader.getCompressed_page_size();
int uncompressedSize = pageHeader.getUncompressed_page_size(); int uncompressedSize = pageHeader.getUncompressed_page_size();
pageData = readPage(pageHeader, compressedSize, uncompressedSize); pageData = readPage(pageHeader, compressedSize, uncompressedSize);
Expand All @@ -270,7 +272,7 @@ public boolean next() throws IOException {


// TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause // TODO - the metatdata for total size appears to be incorrect for impala generated files, need to find cause
// and submit a bug report // and submit a bug report
if(!dataReader.hasRemainder() || parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) { if(parentColumnReader.totalValuesRead == parentColumnReader.columnChunkMetaData.getValueCount()) {
return false; return false;
} }
clearBuffers(); clearBuffers();
Expand Down Expand Up @@ -395,7 +397,7 @@ protected void clearDictionaryBuffers() {


public void clear(){ public void clear(){
try { try {
this.inputStream.close(); // data reader also owns the input stream and will close it.
this.dataReader.close(); this.dataReader.close();
} catch (IOException e) { } catch (IOException e) {
//Swallow the exception which is OK for input streams //Swallow the exception which is OK for input streams
Expand Down

0 comments on commit ee3489c

Please sign in to comment.