Skip to content

Commit

Permalink
DRILL-7100: Fixed IllegalArgumentException when reading Parquet data
Browse files Browse the repository at this point in the history
  • Loading branch information
sachouche authored and ilooner committed Mar 13, 2019
1 parent e5e8419 commit b20a2e6
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 54 deletions.
Expand Up @@ -91,7 +91,7 @@ boolean onEndOfBatch(int batchNumRecords, List<VarLenColumnBatchStats> batchStat
// do not account for null values as we are interested in the
// actual data that is being stored within a batch.
BatchSizingMemoryUtil.getMemoryUsage(stat.vector, stat.numValuesRead, vectorMemoryUsage);
final int batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);
final long batchColumnPrecision = Math.max(1, vectorMemoryUsage.dataBytesUsed / stat.numValuesRead);

double currAvgPrecision = columnPrecisionStats.avgPrecision;
double newAvgPrecision = ((numBatches - 1) * currAvgPrecision + batchColumnPrecision) / numBatches;
Expand Down Expand Up @@ -138,7 +138,7 @@ private static final class ColumnPrecisionStats {
/** Materialized field */
private final MaterializedField field;
/** Average column precision */
private int avgPrecision;
private long avgPrecision;

private ColumnPrecisionStats(MaterializedField field) {
this.field = field;
Expand Down
Expand Up @@ -59,30 +59,30 @@ public final class BatchSizingMemoryUtil {
* limit; false otherwise
*/
public static boolean canAddNewData(ColumnMemoryUsageInfo columnMemoryUsage,
int newBitsMemory,
int newOffsetsMemory,
int newDataMemory) {
long newBitsMemory,
long newOffsetsMemory,
long newDataMemory) {

// First we need to update the vector memory usage
final VectorMemoryUsageInfo vectorMemoryUsage = columnMemoryUsage.vectorMemoryUsage;
getMemoryUsage(columnMemoryUsage.vector, columnMemoryUsage.currValueCount, vectorMemoryUsage);

// We need to compute the new ValueVector memory usage if we attempt to add the new payload
// usedCapacity, int newPayload, int currentCapacity
int totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
long totalBitsMemory = computeNewVectorCapacity(vectorMemoryUsage.bitsBytesUsed,
newBitsMemory,
vectorMemoryUsage.bitsBytesCapacity);

int totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
long totalOffsetsMemory = computeNewVectorCapacity(vectorMemoryUsage.offsetsBytesUsed,
newOffsetsMemory,
vectorMemoryUsage.offsetsByteCapacity);

int totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
long totalDataMemory = computeNewVectorCapacity(vectorMemoryUsage.dataBytesUsed,
newDataMemory,
vectorMemoryUsage.dataByteCapacity);

// Alright now we can figure out whether the new payload will take us over the maximum memory threshold
int totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
long totalMemory = totalBitsMemory + totalOffsetsMemory + totalDataMemory;
assert totalMemory >= 0;

return totalMemory <= columnMemoryUsage.memoryQuota.getMaxMemoryUsage();
Expand Down Expand Up @@ -227,16 +227,16 @@ public static int getAvgVariableLengthColumnTypePrecision(ParquetColumnMetadata
* @param valueCount number of column values
* @return memory size required to store "valueCount" within a value vector
*/
public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
public static long computeFixedLengthVectorMemory(ParquetColumnMetadata column, int valueCount) {
assert column.isFixedLength();

// Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any)
// + next-power-of-two(DT_LEN * valueCount) // data storage

int memoryUsage = BaseAllocator.nextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);
long memoryUsage = BaseAllocator.longNextPowerOfTwo(getFixedColumnTypePrecision(column) * valueCount);

if (column.getField().isNullable()) {
memoryUsage += BaseAllocator.nextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
memoryUsage += BaseAllocator.longNextPowerOfTwo(BYTE_VALUE_WIDTH * valueCount);
}

return memoryUsage;
Expand All @@ -248,19 +248,19 @@ public static int computeFixedLengthVectorMemory(ParquetColumnMetadata column, i
* @param valueCount number of column values
* @return memory size required to store "valueCount" within a value vector
*/
public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column,
int averagePrecision, int valueCount) {
public static long computeVariableLengthVectorMemory(ParquetColumnMetadata column,
long averagePrecision, int valueCount) {

assert !column.isFixedLength();

// Formula: memory-usage = next-power-of-two(byte-size * valueCount) // nullable storage (if any)
// + next-power-of-two(int-size * valueCount) // offsets storage
// + next-power-of-two(DT_LEN * valueCount) // data storage
int memoryUsage = BaseAllocator.nextPowerOfTwo(averagePrecision * valueCount);
memoryUsage += BaseAllocator.nextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));
long memoryUsage = BaseAllocator.longNextPowerOfTwo(averagePrecision * valueCount);
memoryUsage += BaseAllocator.longNextPowerOfTwo(INT_VALUE_WIDTH * (valueCount + 1));

if (column.getField().isNullable()) {
memoryUsage += BaseAllocator.nextPowerOfTwo(valueCount);
memoryUsage += BaseAllocator.longNextPowerOfTwo(valueCount);
}
return memoryUsage;
}
Expand All @@ -269,8 +269,8 @@ public static int computeVariableLengthVectorMemory(ParquetColumnMetadata column
// Internal implementation
// ----------------------------------------------------------------------------

private static int computeNewVectorCapacity(int usedCapacity, int newPayload, int currentCapacity) {
int newUsedCapacity = BaseAllocator.nextPowerOfTwo(usedCapacity + newPayload);
private static long computeNewVectorCapacity(long usedCapacity, long newPayload, long currentCapacity) {
long newUsedCapacity = BaseAllocator.longNextPowerOfTwo(usedCapacity + newPayload);
assert newUsedCapacity >= 0;

return Math.max(currentCapacity, newUsedCapacity);
Expand Down Expand Up @@ -299,17 +299,17 @@ public static final class ColumnMemoryUsageInfo {
*/
public static final class VectorMemoryUsageInfo {
/** Bits vector capacity */
public int bitsBytesCapacity;
public long bitsBytesCapacity;
/** Offsets vector capacity */
public int offsetsByteCapacity;
public long offsetsByteCapacity;
/** Data vector capacity */
public int dataByteCapacity;
public long dataByteCapacity;
/** Bits vector used up capacity */
public int bitsBytesUsed;
public long bitsBytesUsed;
/** Offsets vector used up capacity */
public int offsetsBytesUsed;
public long offsetsBytesUsed;
/** Data vector used up capacity */
public int dataBytesUsed;
public long dataBytesUsed;

public void reset() {
bitsBytesCapacity = 0;
Expand Down
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.drill.common.map.CaseInsensitiveMap;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.exec.ExecConstants;
Expand All @@ -34,6 +35,7 @@
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.AllocationHelper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;

/**
* This class is tasked with managing all aspects of flat Parquet reader record batch sizing logic.
Expand All @@ -56,11 +58,11 @@ public final class RecordBatchSizerManager {
/** Configured Parquet records per batch */
private final int configRecordsPerBatch;
/** Configured Parquet memory size per batch */
private final int configMemorySizePerBatch;
private final long configMemorySizePerBatch;
/** An upper bound on the Parquet records per batch based on the configured value and schema */
private int maxRecordsPerBatch;
/** An upper bound on the Parquet memory size per batch based on the configured value and schema */
private int maxMemorySizePerBatch;
private long maxMemorySizePerBatch;
/** The current number of records per batch as it can be dynamically optimized */
private int recordsPerBatch;

Expand Down Expand Up @@ -162,7 +164,8 @@ public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryExcep
ColumnMemoryInfo columnMemoryInfo = columnMemoryInfoMap.get(v.getField().getName());

if (columnMemoryInfo != null) {
AllocationHelper.allocate(v, recordsPerBatch, columnMemoryInfo.columnPrecision, 0);
Preconditions.checkState(columnMemoryInfo.columnPrecision <= Integer.MAX_VALUE, "Column precision cannot exceed 2GB");
AllocationHelper.allocate(v, recordsPerBatch, (int) columnMemoryInfo.columnPrecision, 0);
} else {
// This column was found in another Parquet file but not the current one; so we inject
// a null value. At this time, we do not account for such columns. Why? the right design is
Expand Down Expand Up @@ -219,7 +222,7 @@ public int getCurrentRecordsPerBatch() {
/**
* @return current total memory per batch (may change across batches)
*/
public int getCurrentMemorySizePerBatch() {
public long getCurrentMemorySizePerBatch() {
return maxMemorySizePerBatch; // Current logic doesn't mutate the max-memory after it has been set
}

Expand All @@ -233,7 +236,7 @@ public int getConfigRecordsPerBatch() {
/**
* @return configured memory size per batch (may be different from the enforced one)
*/
public int getConfigMemorySizePerBatch() {
public long getConfigMemorySizePerBatch() {
return configMemorySizePerBatch;
}

Expand Down Expand Up @@ -265,13 +268,13 @@ public void close() {
// Internal implementation logic
// ----------------------------------------------------------------------------

private int getConfiguredMaxBatchMemory(OptionManager options) {
private long getConfiguredMaxBatchMemory(OptionManager options) {
// Use the parquet specific configuration if set
int maxMemory = (int) options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);
long maxMemory = options.getLong(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE);

// Otherwise, use the common property
if (maxMemory <= 0) {
maxMemory = (int) options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
maxMemory = options.getLong(ExecConstants.OUTPUT_BATCH_SIZE);
}
return maxMemory;
}
Expand Down Expand Up @@ -304,8 +307,8 @@ private int normalizeNumRecordsPerBatch() {
return normalizedNumRecords;
}

private int normalizeMemorySizePerBatch() {
int normalizedMemorySize = configMemorySizePerBatch;
private long normalizeMemorySizePerBatch() {
long normalizedMemorySize = configMemorySizePerBatch;

if (normalizedMemorySize <= 0) {
final String message = String.format("Invalid Parquet memory per batch [%d] byte(s)",
Expand All @@ -321,10 +324,10 @@ private int normalizeMemorySizePerBatch() {
return normalizedMemorySize; // NOOP
}

final int memorySizePerColumn = normalizedMemorySize / numColumns;
final long memorySizePerColumn = normalizedMemorySize / numColumns;

if (memorySizePerColumn < MIN_COLUMN_MEMORY_SZ) {
final int prevValue = normalizedMemorySize;
final long prevValue = normalizedMemorySize;
normalizedMemorySize = MIN_COLUMN_MEMORY_SZ * numColumns;

final String message = String.format("The Parquet memory per batch [%d] byte(s) is too low for this query ; using [%d] bytes",
Expand Down Expand Up @@ -444,9 +447,9 @@ private void distributeExtraMemorySpace(MemoryRequirementContainer requiredMemor
return; // we're done
}

final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
final int extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
final int perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;
final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
final long extraMemorySpace = maxMemorySizePerBatch - totalMemoryNeeded;
final long perColumnExtraSpace = extraMemorySpace / numVariableLengthColumns;

if (perColumnExtraSpace == 0) {
return;
Expand Down Expand Up @@ -481,7 +484,7 @@ private boolean releaseFieldOverflowContainer(String field, boolean remove) {
return remove;
}

private int computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
private long computeVectorMemory(ColumnMemoryInfo columnInfo, int numValues) {
if (columnInfo.columnMeta.isFixedLength()) {
return BatchSizingMemoryUtil.computeFixedLengthVectorMemory(columnInfo.columnMeta, numValues);
}
Expand All @@ -506,7 +509,7 @@ private double computeNeededMemoryRatio(MemoryRequirementContainer requiredMemor
requiredMemory.variableLenRequiredMemory += columnInfo.columnMemoryQuota.maxMemoryUsage;
}

final int totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
final long totalMemoryNeeded = requiredMemory.fixedLenRequiredMemory + requiredMemory.variableLenRequiredMemory;
assert totalMemoryNeeded > 0;

double neededMemoryRatio = ((double) maxMemorySizePerBatch) / totalMemoryNeeded;
Expand Down Expand Up @@ -612,7 +615,7 @@ public VarLenColumnBatchStats(ValueVector vector, int numValuesRead) {
/** Field memory quota */
public static final class ColumnMemoryQuota {
/** Maximum cumulative memory that could be used */
private int maxMemoryUsage;
private long maxMemoryUsage;
/** Maximum number of values that could be inserted */
private int maxNumValues;

Expand All @@ -622,14 +625,14 @@ public ColumnMemoryQuota() {
/**
* @param maxMemoryUsage maximum cumulative memory that could be used
*/
public ColumnMemoryQuota(int maxMemoryUsage) {
public ColumnMemoryQuota(long maxMemoryUsage) {
this.maxMemoryUsage = maxMemoryUsage;
}

/**
* @return the maxMemoryUsage
*/
public int getMaxMemoryUsage() {
public long getMaxMemoryUsage() {
return maxMemoryUsage;
}

Expand All @@ -651,17 +654,17 @@ static final class ColumnMemoryInfo {
/** Column metadata */
ParquetColumnMetadata columnMeta;
/** Column value precision (maximum length for VL columns) */
int columnPrecision;
long columnPrecision;
/** Column current memory quota within a batch */
final ColumnMemoryQuota columnMemoryQuota = new ColumnMemoryQuota();
}

/** Memory requirements container */
static final class MemoryRequirementContainer {
/** Memory needed for the fixed length columns given a specific record size */
private int fixedLenRequiredMemory;
private long fixedLenRequiredMemory;
/** Memory needed for the fixed length columns given a specific record size */
private int variableLenRequiredMemory;
private long variableLenRequiredMemory;

private void reset() {
this.fixedLenRequiredMemory = 0;
Expand Down
Expand Up @@ -93,9 +93,9 @@ private void testCanAddNewData(boolean isOptional) {

for (int columnIdx = 0; columnIdx < 3; columnIdx++) {
final ColumnMemoryUsageInfo columnInfo = columnMemoryInfo[columnIdx];
final int remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
final int remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
final int remainingDataCapacity = getRemainingDataCapacity(columnInfo);
final long remainingBitsCapacity = getRemainingBitsCapacity(columnInfo);
final long remainingOffsetsCapacity = getRemainingOffsetsCapacity(columnInfo);
final long remainingDataCapacity = getRemainingDataCapacity(columnInfo);

// Test current VV is within quota (since we are not adding new entries)
Assert.assertTrue(BatchSizingMemoryUtil.canAddNewData(columnInfo, 0, 0, 0));
Expand Down Expand Up @@ -152,15 +152,15 @@ private ColumnMemoryUsageInfo getColumnMemoryUsageInfo(int columnIdx) {
return result;
}

private static int getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
private static long getRemainingBitsCapacity(ColumnMemoryUsageInfo columnInfo) {
return columnInfo.vectorMemoryUsage.bitsBytesCapacity - columnInfo.vectorMemoryUsage.bitsBytesUsed;
}

private static int getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
private static long getRemainingOffsetsCapacity(ColumnMemoryUsageInfo columnInfo) {
return columnInfo.vectorMemoryUsage.offsetsByteCapacity - columnInfo.vectorMemoryUsage.offsetsBytesUsed;
}

private static int getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
private static long getRemainingDataCapacity(ColumnMemoryUsageInfo columnInfo) {
return columnInfo.vectorMemoryUsage.dataByteCapacity - columnInfo.vectorMemoryUsage.dataBytesUsed;
}

Expand Down
Expand Up @@ -576,6 +576,22 @@ public static int nextPowerOfTwo(int val) {
}
}

/**
* Rounds up the provided value to the nearest power of two.
*
* @param val
* An integer long value.
* @return The closest power of two of that value.
*/
public static long longNextPowerOfTwo(long val) {
long highestBit = Long.highestOneBit(val);
if (highestBit == val) {
return val;
} else {
return highestBit << 1;
}
}

/**
* Verifies the accounting state of the allocator. Only works for DEBUG.
*
Expand Down

0 comments on commit b20a2e6

Please sign in to comment.