Skip to content

Commit

Permalink
[SPARK-24133][SQL] Check for integer overflows when resizing Writable…
Browse files Browse the repository at this point in the history
…ColumnVectors

## What changes were proposed in this pull request?

`ColumnVector`s store string data in one big byte array. Since the array size is capped at just under Integer.MAX_VALUE, a single `ColumnVector` cannot store more than 2GB of string data.
But since the Parquet files commonly contain large blobs stored as strings, and `ColumnVector`s by default carry 4096 values, it's entirely possible to go past that limit. In such cases a negative capacity is requested from `WritableColumnVector.reserve()`. The call succeeds (requested capacity is smaller than already allocated capacity), and consequently `java.lang.ArrayIndexOutOfBoundsException` is thrown when the reader actually attempts to put the data into the array.

This change introduces a simple check for integer overflow to `WritableColumnVector.reserve()` which should help catch the error earlier and provide more informative exception. Additionally, the error message in `WritableColumnVector.throwUnsupportedException()` was corrected, as it previously encouraged users to increase rather than reduce the batch size.

## How was this patch tested?

New units tests were added.

Author: Ala Luszczak <ala@databricks.com>

Closes #21206 from ala/overflow-reserve.
  • Loading branch information
ala authored and gatorsmile committed May 2, 2018
1 parent 8dbf56c commit 8bd2702
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 8 deletions.
Expand Up @@ -81,7 +81,9 @@ public void close() {
}

public void reserve(int requiredCapacity) {
if (requiredCapacity > capacity) {
if (requiredCapacity < 0) {
throwUnsupportedException(requiredCapacity, null);
} else if (requiredCapacity > capacity) {
int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
if (requiredCapacity <= newCapacity) {
try {
Expand All @@ -96,13 +98,16 @@ public void reserve(int requiredCapacity) {
}

private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
String message = "Cannot reserve additional contiguous bytes in the vectorized reader " +
"(requested = " + requiredCapacity + " bytes). As a workaround, you can disable the " +
"vectorized reader, or increase the vectorized reader batch size. For parquet file " +
"format, refer to " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + " and " +
SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().key() + "; for orc file format, refer to " +
SQLConf.ORC_VECTORIZED_READER_ENABLED().key() + " and " +
SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().key() + ".";
String message = "Cannot reserve additional contiguous bytes in the vectorized reader (" +
(requiredCapacity >= 0 ? "requested " + requiredCapacity + " bytes" : "integer overflow") +
"). As a workaround, you can reduce the vectorized reader batch size, or disable the " +
"vectorized reader. For parquet file format, refer to " +
SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().key() +
" (default " + SQLConf.PARQUET_VECTORIZED_READER_BATCH_SIZE().defaultValueString() +
") and " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() + "; for orc file format, " +
"refer to " + SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().key() +
" (default " + SQLConf.ORC_VECTORIZED_READER_BATCH_SIZE().defaultValueString() +
") and " + SQLConf.ORC_VECTORIZED_READER_ENABLED().key() + ".";
throw new RuntimeException(message, cause);
}

Expand Down
Expand Up @@ -1333,4 +1333,11 @@ class ColumnarBatchSuite extends SparkFunSuite {

column.close()
}

testVector("WritableColumnVector.reserve(): requested capacity is negative", 1024, ByteType) {
column =>
val ex = intercept[RuntimeException] { column.reserve(-1) }
assert(ex.getMessage.contains(
"Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow)"))
}
}

0 comments on commit 8bd2702

Please sign in to comment.