Skip to content

Commit

Permalink
[SPARK-22033][CORE] BufferHolder, other size checks should account fo…
Browse files Browse the repository at this point in the history
…r the specific VM array size limitations

## What changes were proposed in this pull request?

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19266 from srowen/SPARK-22033.
  • Loading branch information
srowen committed Sep 23, 2017
1 parent 3920af7 commit 50ada2a
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class LongArray {
private final long length;

public LongArray(MemoryBlock memory) {
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size > 4 billion elements";
assert memory.size() < (long) Integer.MAX_VALUE * 8: "Array size >= Integer.MAX_VALUE elements";
this.memory = memory;
this.baseObj = memory.getBaseObject();
this.baseOffset = memory.getBaseOffset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,17 @@ public interface HashMapGrowthStrategy {
HashMapGrowthStrategy DOUBLING = new Doubling();

class Doubling implements HashMapGrowthStrategy {

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;

@Override
public int nextCapacity(int currentCapacity) {
assert (currentCapacity > 0);
int doubleCapacity = currentCapacity * 2;
// Guard against overflow
return (currentCapacity * 2 > 0) ? (currentCapacity * 2) : Integer.MAX_VALUE;
return (doubleCapacity > 0 && doubleCapacity <= ARRAY_MAX) ? doubleCapacity : ARRAY_MAX;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,22 +126,22 @@ private[spark] class CompactBuffer[T: ClassTag] extends Seq[T] with Serializable

/** Increase our size to newSize and grow the backing array if needed. */
private def growToSize(newSize: Int): Unit = {
if (newSize < 0) {
throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val arrayMax = Int.MaxValue - 8
if (newSize < 0 || newSize - 2 > arrayMax) {
throw new UnsupportedOperationException(s"Can't grow buffer past $arrayMax elements")
}
val capacity = if (otherElements != null) otherElements.length + 2 else 2
if (newSize > capacity) {
var newArrayLen = 8
var newArrayLen = 8L
while (newSize - 2 > newArrayLen) {
newArrayLen *= 2
if (newArrayLen == Int.MinValue) {
// Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
// Note that we set the new array length to Int.MaxValue - 2 so that our capacity
// calculation above still gives a positive integer.
newArrayLen = Int.MaxValue - 2
}
}
val newArray = new Array[T](newArrayLen)
if (newArrayLen > arrayMax) {
newArrayLen = arrayMax
}
val newArray = new Array[T](newArrayLen.toInt)
if (otherElements != null) {
System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.util.collection.WritablePartitionedPairCollection._
* Append-only buffer of key-value pairs, each with a corresponding partition ID, that keeps track
* of its estimated size in bytes.
*
* The buffer can support up to `1073741823 (2 ^ 30 - 1)` elements.
* The buffer can support up to 1073741819 elements.
*/
private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
extends WritablePartitionedPairCollection[K, V] with SizeTracker
Expand Down Expand Up @@ -59,7 +59,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
throw new IllegalStateException(s"Can't insert more than ${MAXIMUM_CAPACITY} elements")
}
val newCapacity =
if (capacity * 2 < 0 || capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
if (capacity * 2 > MAXIMUM_CAPACITY) { // Overflow
MAXIMUM_CAPACITY
} else {
capacity * 2
Expand Down Expand Up @@ -96,5 +96,7 @@ private[spark] class PartitionedPairBuffer[K, V](initialCapacity: Int = 64)
}

private object PartitionedPairBuffer {
val MAXIMUM_CAPACITY = Int.MaxValue / 2 // 2 ^ 30 - 1
// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
val MAXIMUM_CAPACITY: Int = (Int.MaxValue - 8) / 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
* if the fields of row are all fixed-length, as the size of result row is also fixed.
*/
public class BufferHolder {

// Some JVMs can't allocate arrays of length Integer.MAX_VALUE; actual max is somewhat
// smaller. Be conservative and lower the cap a little.
private static final int ARRAY_MAX = Integer.MAX_VALUE - 8;

public byte[] buffer;
public int cursor = Platform.BYTE_ARRAY_OFFSET;
private final UnsafeRow row;
Expand All @@ -61,15 +66,15 @@ public BufferHolder(UnsafeRow row, int initialSize) {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
public void grow(int neededSize) {
if (neededSize > Integer.MAX_VALUE - totalSize()) {
if (neededSize > ARRAY_MAX - totalSize()) {
throw new UnsupportedOperationException(
"Cannot grow BufferHolder by size " + neededSize + " because the size after growing " +
"exceeds size limitation " + Integer.MAX_VALUE);
"exceeds size limitation " + ARRAY_MAX);
}
final int length = totalSize() + neededSize;
if (buffer.length < length) {
// This will not happen frequently, because the buffer is re-used.
int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : Integer.MAX_VALUE;
int newLength = length < ARRAY_MAX / 2 ? length * 2 : ARRAY_MAX;
final byte[] tmp = new byte[newLength];
Platform.copyMemory(
buffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ public final int appendStruct(boolean isNull) {
* Upper limit for the maximum capacity for this column.
*/
@VisibleForTesting
protected int MAX_CAPACITY = Integer.MAX_VALUE;
protected int MAX_CAPACITY = Integer.MAX_VALUE - 8;

/**
* Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
Expand Down

0 comments on commit 50ada2a

Please sign in to comment.