Skip to content

Commit

Permalink
[SPARK-25904][CORE] Allocate arrays smaller than Int.MaxValue
Browse files Browse the repository at this point in the history
JVMs can't allocate arrays of length exactly Int.MaxValue, so ensure we never try to allocate an array that big.  This commit changes some defaults & configs to gracefully fallover to something that doesn't require one large array in some cases; in other cases it simply improves an error message for cases which will still fail.

Closes #22818 from squito/SPARK-25827.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
  • Loading branch information
squito committed Nov 7, 2018
1 parent 9e9fa2f commit 8fbc183
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 26 deletions.
17 changes: 10 additions & 7 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Expand Up @@ -416,8 +416,9 @@ package object config {
.internal()
.doc("The chunk size in bytes during writing out the bytes of ChunkedByteBuffer.")
.bytesConf(ByteUnit.BYTE)
.checkValue(_ <= Int.MaxValue, "The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue.")
.checkValue(_ <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
"The chunk size during writing out the bytes of" +
" ChunkedByteBuffer should not larger than Int.MaxValue - 15.")
.createWithDefault(64 * 1024 * 1024)

private[spark] val CHECKPOINT_COMPRESS =
Expand Down Expand Up @@ -488,17 +489,19 @@ package object config {
"otherwise specified. These buffers reduce the number of disk seeks and system calls " +
"made in creating intermediate shuffle files.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
s"The file buffer size must be greater than 0 and less than" +
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE =
ConfigBuilder("spark.shuffle.unsafe.file.output.buffer")
.doc("The file system for this buffer size after each partition " +
"is written in unsafe shuffle writer. In KiB unless otherwise specified.")
.bytesConf(ByteUnit.KiB)
.checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
s"The buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
.checkValue(v => v > 0 && v <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024,
s"The buffer size must be greater than 0 and less than" +
s" ${ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH / 1024}.")
.createWithDefaultString("32k")

private[spark] val SHUFFLE_DISK_WRITE_BUFFER_SIZE =
Expand Down Expand Up @@ -610,7 +613,7 @@ package object config {
.internal()
.doc("For testing only, controls the size of chunks when memory mapping a file")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Int.MaxValue)
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)

private[spark] val BARRIER_SYNC_TIMEOUT =
ConfigBuilder("spark.barrier.sync.timeout")
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.internal.{config, Logging}
import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.network.util.{AbstractFileRegion, JavaUtils}
import org.apache.spark.security.CryptoStreamUtils
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils
import org.apache.spark.util.io.ChunkedByteBuffer

Expand Down Expand Up @@ -217,7 +218,7 @@ private[spark] class EncryptedBlockData(
var remaining = blockSize
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, Int.MaxValue)
val chunkSize = math.min(remaining, ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)
val chunk = allocator(chunkSize.toInt)
remaining -= chunkSize
JavaUtils.readFully(source, chunk)
Expand All @@ -235,7 +236,8 @@ private[spark] class EncryptedBlockData(
// This is used by the block transfer service to replicate blocks. The upload code reads
// all bytes into memory to send the block to the remote executor, so it's ok to do this
// as long as the block fits in a Java array.
assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
assert(blockSize <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
"Block is too large to be wrapped in a byte buffer.")
val dst = ByteBuffer.allocate(blockSize.toInt)
val in = open()
try {
Expand Down
Expand Up @@ -34,6 +34,7 @@ import org.apache.spark.memory.{MemoryManager, MemoryMode}
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
import org.apache.spark.storage._
import org.apache.spark.unsafe.Platform
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.{SizeEstimator, Utils}
import org.apache.spark.util.collection.SizeTrackingVector
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
Expand Down Expand Up @@ -333,11 +334,11 @@ private[spark] class MemoryStore(

// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
val chunkSize = if (initialMemoryThreshold > ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
s"${Utils.bytesToString(Int.MaxValue)}")
Int.MaxValue
s"${Utils.bytesToString(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)}")
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
} else {
initialMemoryThreshold.toInt
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
* @throws UnsupportedOperationException if this buffer's size exceeds the maximum array size.
*/
def toArray: Array[Byte] = {
if (size >= Integer.MAX_VALUE) {
if (size >= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) {
throw new UnsupportedOperationException(
s"cannot call toArray because buffer size ($size bytes) exceeds maximum array size")
}
Expand Down
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeArrayData}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.array.ByteArrayMethods

/**
* Trait for a local matrix.
Expand Down Expand Up @@ -456,7 +457,7 @@ object DenseMatrix {
*/
@Since("1.3.0")
def zeros(numRows: Int, numCols: Int): DenseMatrix = {
require(numRows.toLong * numCols <= Int.MaxValue,
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
s"$numRows x $numCols dense matrix is too large to allocate")
new DenseMatrix(numRows, numCols, new Array[Double](numRows * numCols))
}
Expand All @@ -469,7 +470,7 @@ object DenseMatrix {
*/
@Since("1.3.0")
def ones(numRows: Int, numCols: Int): DenseMatrix = {
require(numRows.toLong * numCols <= Int.MaxValue,
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
s"$numRows x $numCols dense matrix is too large to allocate")
new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0))
}
Expand Down Expand Up @@ -499,7 +500,7 @@ object DenseMatrix {
*/
@Since("1.3.0")
def rand(numRows: Int, numCols: Int, rng: Random): DenseMatrix = {
require(numRows.toLong * numCols <= Int.MaxValue,
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
s"$numRows x $numCols dense matrix is too large to allocate")
new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextDouble()))
}
Expand All @@ -513,7 +514,7 @@ object DenseMatrix {
*/
@Since("1.3.0")
def randn(numRows: Int, numCols: Int, rng: Random): DenseMatrix = {
require(numRows.toLong * numCols <= Int.MaxValue,
require(numRows.toLong * numCols <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
s"$numRows x $numCols dense matrix is too large to allocate")
new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(rng.nextGaussian()))
}
Expand Down Expand Up @@ -846,8 +847,8 @@ object SparseMatrix {
s"density must be a double in the range 0.0 <= d <= 1.0. Currently, density: $density")
val size = numRows.toLong * numCols
val expected = size * density
assert(expected < Int.MaxValue,
"The expected number of nonzeros cannot be greater than Int.MaxValue.")
assert(expected < ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH,
"The expected number of nonzeros cannot be greater than Int.MaxValue - 15.")
val nnz = math.ceil(expected).toInt
if (density == 0.0) {
new SparseMatrix(numRows, numCols, new Array[Int](numCols + 1), Array.empty, Array.empty)
Expand Down
Expand Up @@ -27,7 +27,6 @@ import scala.collection.immutable
import scala.util.matching.Regex

import org.apache.hadoop.fs.Path
import org.tukaani.xz.LZMA2Options

import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
Expand All @@ -36,6 +35,7 @@ import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -1246,7 +1246,7 @@ object SQLConf {
.doc("Threshold for number of rows guaranteed to be held in memory by the sort merge " +
"join operator")
.intConf
.createWithDefault(Int.MaxValue)
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)

val SORT_MERGE_JOIN_EXEC_BUFFER_SPILL_THRESHOLD =
buildConf("spark.sql.sortMergeJoinExec.buffer.spill.threshold")
Expand Down Expand Up @@ -1480,7 +1480,7 @@ object SQLConf {
"'SELECT x FROM t ORDER BY y LIMIT m', if m is under this threshold, do a top-K sort" +
" in memory, otherwise do a global sort which spills to disk if necessary.")
.intConf
.createWithDefault(Int.MaxValue)
.createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down
7 changes: 3 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql

import java.io.CharArrayWriter
import java.sql.{Date, Timestamp}

import scala.collection.JavaConverters._
import scala.language.implicitConversions
Expand Down Expand Up @@ -46,7 +45,6 @@ import org.apache.spark.sql.catalyst.parser.{ParseException, ParserUtils}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.arrow.{ArrowBatchStreamWriter, ArrowConverters}
import org.apache.spark.sql.execution.command._
Expand All @@ -57,6 +55,7 @@ import org.apache.spark.sql.streaming.DataStreamWriter
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.SchemaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.types.CalendarInterval
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -287,7 +286,7 @@ class Dataset[T] private[sql](
_numRows: Int,
truncate: Int = 20,
vertical: Boolean = false): String = {
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
// Get rows represented by Seq[Seq[String]], we may get one more line if it has more data.
val tmpRows = getRows(numRows, truncate)

Expand Down Expand Up @@ -3264,7 +3263,7 @@ class Dataset[T] private[sql](
_numRows: Int,
truncate: Int): Array[Any] = {
EvaluatePython.registerPicklers()
val numRows = _numRows.max(0).min(Int.MaxValue - 1)
val numRows = _numRows.max(0).min(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH - 1)
val rows = getRows(numRows, truncate).map(_.toArray).toArray
val toJava: (Any) => Any = EvaluatePython.toJava(_, ArrayType(ArrayType(StringType)))
val iter: Iterator[Array[Byte]] = new SerDeUtil.AutoBatchedPickler(
Expand Down

0 comments on commit 8fbc183

Please sign in to comment.