Skip to content
Permalink
Browse files
DRILL-6579: Added sanity checks to the Parquet reader to avoid infini…
…te loops

closes #1361
  • Loading branch information
sachouche authored and sohami committed Jul 13, 2018
1 parent cad9aad commit 56f951cc7a03e497ba34eeca4bd9265ae30c4650
Showing 10 changed files with 33 additions and 11 deletions.
@@ -40,9 +40,9 @@ public int readBatch() throws Exception {
ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
int currBatchNumRecords = readState.batchSizerMgr().getCurrentRecordsPerBatch();
long recordsToRead = Math.min(currBatchNumRecords, readState.getRemainingValuesToRead());
int readCount = readRecords(firstColumnStatus, recordsToRead);

int readCount = recordsToRead > 0 ? readRecords(firstColumnStatus, recordsToRead) : 0;
readState.fillNullVectors(readCount);

return readCount;
}

@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Stopwatch;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
@@ -111,7 +112,7 @@ private int readRecordsInBulk(int recordsToReadInThisPass) throws IOException {

// Read the column data
int readColumns = columnReader.readRecordsInBulk(batchNumRecords);
assert readColumns <= batchNumRecords : "Reader cannot return more values than requested..";
Preconditions.checkState(readColumns <= batchNumRecords, "Reader cannot return more values than requested..");

if (!overflowCondition) {
if (prevReadColumns >= 0 && prevReadColumns != readColumns) {
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -106,11 +107,13 @@ final void set(PageDataInfo pageInfoInput, boolean clear) {
pageInfo.dictionaryValueReader = pageInfoInput.dictionaryValueReader;
pageInfo.numPageValues = pageInfoInput.numPageValues;
if (clear) {
buffer.clear();
}
buffer.clear();
}
}

final VarLenColumnBulkEntry getEntry(int valuesToRead) {
Preconditions.checkArgument(valuesToRead > 0, "Number of values to read [%s] should be greater than zero", valuesToRead);

VarLenColumnBulkEntry entry = null;

// If there is overflow data, then we need to consume it first
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
@@ -50,6 +51,8 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
final int[] valueLengths = entry.getValuesLength();
final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);

final byte[] tgtBuff = entry.getInternalDataArray();
final int tgtLen = tgtBuff.length;

@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
@@ -51,6 +52,8 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {

final int[] valueLengths = entry.getValuesLength();
final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);

final byte[] tgtBuff = entry.getInternalDataArray();
final byte[] srcBuff = buffer.array();
final int srcLen = buffer.remaining();
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
@@ -32,19 +33,19 @@ final class VarLenFixedEntryReader extends VarLenAbstractPageEntryReader {
VarLenColumnBulkInputCallback containerCallback) {

super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
Preconditions.checkArgument(columnPrecInfo.precision >= 0, "Fixed length precision [%s] cannot be lower than zero", columnPrecInfo.precision);
}

/** {@inheritDoc} */
@Override
final VarLenColumnBulkEntry getEntry(int valuesToRead) {
assert columnPrecInfo.precision >= 0 : "Fixed length precision cannot be lower than zero";

load(true); // load new data to process

final int expectedDataLen = columnPrecInfo.precision;
final int entrySz = 4 + columnPrecInfo.precision;
final int maxValues = Math.min(entry.getMaxEntries(), (pageInfo.pageDataLen - pageInfo.pageDataOff) / entrySz);
final int readBatch = Math.min(maxValues, valuesToRead);
final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
Preconditions.checkState(readBatch > 0, "Read batch count [%d] should be greater than zero", readBatch);

final int[] valueLengths = entry.getValuesLength();
final byte[] tgtBuff = entry.getInternalDataArray();
final byte[] srcBuff = buffer.array();
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.DictionaryReaderWrapper;
@@ -52,6 +53,8 @@ private final VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {
final DictionaryReaderWrapper valueReader = pageInfo.dictionaryValueReader;
final int[] valueLengths = entry.getValuesLength();
final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);

final byte[] tgtBuff = entry.getInternalDataArray();
final int tgtLen = tgtBuff.length;

@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
@@ -53,6 +54,8 @@ VarLenColumnBulkEntry getEntryBulk(int valuesToRead) {

final int[] valueLengths = entry.getValuesLength();
final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);

final byte[] tgtBuff = entry.getInternalDataArray();
final byte[] srcBuff = buffer.array();
final int srcLen = buffer.remaining();
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.ColumnPrecisionInfo;
import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.PageDataInfo;
@@ -33,19 +34,20 @@ final class VarLenNullableFixedEntryReader extends VarLenAbstractPageEntryReader
VarLenColumnBulkInputCallback containerCallback) {

super(buffer, pageInfo, columnPrecInfo, entry, containerCallback);
Preconditions.checkArgument(columnPrecInfo.precision >= 0, "Fixed length precision cannot be lower than zero");
}

/** {@inheritDoc} */
@Override
final VarLenColumnBulkEntry getEntry(int valuesToRead) {
assert columnPrecInfo.precision >= 0 : "Fixed length precision cannot be lower than zero";

// TODO - We should not use force reload for sparse columns (values with lot of nulls)
load(true); // load new data to process

final int expectedDataLen = columnPrecInfo.precision;
final int entrySz = 4 + columnPrecInfo.precision;
final int readBatch = Math.min(entry.getMaxEntries(), valuesToRead);
Preconditions.checkState(readBatch > 0, "Read batch count [%s] should be greater than zero", readBatch);

final int[] valueLengths = entry.getValuesLength();
final byte[] tgtBuff = entry.getInternalDataArray();
final byte[] srcBuff = buffer.array();
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.store.parquet.columnreaders;

import com.google.common.base.Preconditions;
import java.nio.ByteBuffer;

import org.apache.drill.exec.store.parquet.columnreaders.VarLenColumnBulkInput.VarLenColumnBulkInputCallback;
@@ -80,6 +81,8 @@ VarLenColumnBulkEntry getEntry(int valuesToRead) {
// load some overflow data for processing
final int maxValues = Math.min(entry.getMaxEntries(), valuesToRead);
final int numAvailableValues = overflowDataCache.load(overflowState.currValueIdx, maxValues);
Preconditions.checkState(numAvailableValues > 0, "Number values to read [%s] should be greater than zero", numAvailableValues);

final int firstValueDataOffset = getDataBufferStartOffset() + adjustDataOffset(overflowState.currValueIdx);
int totalDataLen = 0;
int currValueIdx = overflowState.currValueIdx;

0 comments on commit 56f951c

Please sign in to comment.