diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulable.scala similarity index 54% rename from core/src/main/scala/org/apache/spark/Accumulators.scala rename to core/src/main/scala/org/apache/spark/Accumulable.scala index 5592b75afb75b..a456d420b8d6a 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulable.scala @@ -20,14 +20,12 @@ package org.apache.spark import java.io.{ObjectInputStream, Serializable} import scala.collection.generic.Growable -import scala.collection.Map -import scala.collection.mutable -import scala.ref.WeakReference import scala.reflect.ClassTag import org.apache.spark.serializer.JavaSerializer import org.apache.spark.util.Utils + /** * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. @@ -166,6 +164,7 @@ class Accumulable[R, T] private[spark] ( override def toString: String = if (value_ == null) "null" else value_.toString } + /** * Helper object defining how to accumulate values of a particular type. An implicit * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. @@ -201,6 +200,7 @@ trait AccumulableParam[R, T] extends Serializable { def zero(initialValue: R): R } + private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] extends AccumulableParam[R, T] { @@ -224,176 +224,3 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa copy } } - -/** - * A simpler value of [[Accumulable]] where the result type being accumulated is the same - * as the types of elements being merged, i.e. variables that are only "added" to through an - * associative operation and can therefore be efficiently supported in parallel. They can be used - * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric - * value types, and programmers can add support for new types. - * - * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. - * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. - * However, they cannot read its value. Only the driver program can read the accumulator's value, - * using its value method. - * - * The interpreter session below shows an accumulator being used to add up the elements of an array: - * - * {{{ - * scala> val accum = sc.accumulator(0) - * accum: spark.Accumulator[Int] = 0 - * - * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) - * ... - * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s - * - * scala> accum.value - * res2: Int = 10 - * }}} - * - * @param initialValue initial value of accumulator - * @param param helper object defining how to add elements of type `T` - * @tparam T result type - */ -class Accumulator[T] private[spark] ( - @transient private[spark] val initialValue: T, - param: AccumulatorParam[T], - name: Option[String], - internal: Boolean) - extends Accumulable[T, T](initialValue, param, name, internal) { - - def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { - this(initialValue, param, name, false) - } - - def this(initialValue: T, param: AccumulatorParam[T]) = { - this(initialValue, param, None, false) - } -} - -/** - * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add - * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be - * available when you create Accumulators of a specific type. - * - * @tparam T type of value to accumulate - */ -trait AccumulatorParam[T] extends AccumulableParam[T, T] { - def addAccumulator(t1: T, t2: T): T = { - addInPlace(t1, t2) - } -} - -object AccumulatorParam { - - // The following implicit objects were in SparkContext before 1.2 and users had to - // `import SparkContext._` to enable them. Now we move them here to make the compiler find - // them automatically. However, as there are duplicate codes in SparkContext for backward - // compatibility, please update them accordingly if you modify the following implicit objects. - - implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { - def addInPlace(t1: Double, t2: Double): Double = t1 + t2 - def zero(initialValue: Double): Double = 0.0 - } - - implicit object IntAccumulatorParam extends AccumulatorParam[Int] { - def addInPlace(t1: Int, t2: Int): Int = t1 + t2 - def zero(initialValue: Int): Int = 0 - } - - implicit object LongAccumulatorParam extends AccumulatorParam[Long] { - def addInPlace(t1: Long, t2: Long): Long = t1 + t2 - def zero(initialValue: Long): Long = 0L - } - - implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { - def addInPlace(t1: Float, t2: Float): Float = t1 + t2 - def zero(initialValue: Float): Float = 0f - } - - // TODO: Add AccumulatorParams for other types, e.g. lists and strings -} - -// TODO: The multi-thread support in accumulators is kind of lame; check -// if there's a more intuitive way of doing it right -private[spark] object Accumulators extends Logging { - /** - * This global map holds the original accumulator objects that are created on the driver. - * It keeps weak references to these objects so that accumulators can be garbage-collected - * once the RDDs and user-code that reference them are cleaned up. - */ - val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() - - private var lastId: Long = 0 - - def newId(): Long = synchronized { - lastId += 1 - lastId - } - - def register(a: Accumulable[_, _]): Unit = synchronized { - originals(a.id) = new WeakReference[Accumulable[_, _]](a) - } - - def remove(accId: Long) { - synchronized { - originals.remove(accId) - } - } - - // Add values to the original accumulators with some given IDs - def add(values: Map[Long, Any]): Unit = synchronized { - for ((id, value) <- values) { - if (originals.contains(id)) { - // Since we are now storing weak references, we must check whether the underlying data - // is valid. - originals(id).get match { - case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value - case None => - throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") - } - } else { - logWarning(s"Ignoring accumulator update for unknown accumulator id $id") - } - } - } - -} - -private[spark] object InternalAccumulator { - val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" - val TEST_ACCUMULATOR = "testAccumulator" - - // For testing only. - // This needs to be a def since we don't want to reuse the same accumulator across stages. - private def maybeTestAccumulator: Option[Accumulator[Long]] = { - if (sys.props.contains("spark.testing")) { - Some(new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) - } else { - None - } - } - - /** - * Accumulators for tracking internal metrics. - * - * These accumulators are created with the stage such that all tasks in the stage will - * add to the same set of accumulators. We do this to report the distribution of accumulator - * values across all tasks within each stage. - */ - def create(sc: SparkContext): Seq[Accumulator[Long]] = { - val internalAccumulators = Seq( - // Execution memory refers to the memory used by internal data structures created - // during shuffles, aggregations and joins. The value of this accumulator should be - // approximately the sum of the peak sizes across all such data structures created - // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. - new Accumulator( - 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) - ) ++ maybeTestAccumulator.toSeq - internalAccumulators.foreach { accumulator => - sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) - } - internalAccumulators - } -} diff --git a/core/src/main/scala/org/apache/spark/Accumulator.scala b/core/src/main/scala/org/apache/spark/Accumulator.scala new file mode 100644 index 0000000000000..007136e6ae349 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/Accumulator.scala @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.{mutable, Map} +import scala.ref.WeakReference + + +/** + * A simpler value of [[Accumulable]] where the result type being accumulated is the same + * as the types of elements being merged, i.e. variables that are only "added" to through an + * associative operation and can therefore be efficiently supported in parallel. They can be used + * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of numeric + * value types, and programmers can add support for new types. + * + * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. + * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. + * However, they cannot read its value. Only the driver program can read the accumulator's value, + * using its value method. + * + * The interpreter session below shows an accumulator being used to add up the elements of an array: + * + * {{{ + * scala> val accum = sc.accumulator(0) + * accum: spark.Accumulator[Int] = 0 + * + * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) + * ... + * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s + * + * scala> accum.value + * res2: Int = 10 + * }}} + * + * @param initialValue initial value of accumulator + * @param param helper object defining how to add elements of type `T` + * @tparam T result type + */ +class Accumulator[T] private[spark] ( + @transient private[spark] val initialValue: T, + param: AccumulatorParam[T], + name: Option[String], + internal: Boolean) + extends Accumulable[T, T](initialValue, param, name, internal) { + + def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { + this(initialValue, param, name, false) + } + + def this(initialValue: T, param: AccumulatorParam[T]) = { + this(initialValue, param, None, false) + } +} + + +// TODO: The multi-thread support in accumulators is kind of lame; check +// if there's a more intuitive way of doing it right +private[spark] object Accumulators extends Logging { + /** + * This global map holds the original accumulator objects that are created on the driver. + * It keeps weak references to these objects so that accumulators can be garbage-collected + * once the RDDs and user-code that reference them are cleaned up. + */ + val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() + + private var lastId: Long = 0 + + def newId(): Long = synchronized { + lastId += 1 + lastId + } + + def register(a: Accumulable[_, _]): Unit = synchronized { + originals(a.id) = new WeakReference[Accumulable[_, _]](a) + } + + def remove(accId: Long) { + synchronized { + originals.remove(accId) + } + } + + // Add values to the original accumulators with some given IDs + def add(values: Map[Long, Any]): Unit = synchronized { + for ((id, value) <- values) { + if (originals.contains(id)) { + // Since we are now storing weak references, we must check whether the underlying data + // is valid. + originals(id).get match { + case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value + case None => + throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") + } + } else { + logWarning(s"Ignoring accumulator update for unknown accumulator id $id") + } + } + } + +} + + +/** + * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add + * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be + * available when you create Accumulators of a specific type. + * + * @tparam T type of value to accumulate + */ +trait AccumulatorParam[T] extends AccumulableParam[T, T] { + def addAccumulator(t1: T, t2: T): T = { + addInPlace(t1, t2) + } +} + + +object AccumulatorParam { + + // The following implicit objects were in SparkContext before 1.2 and users had to + // `import SparkContext._` to enable them. Now we move them here to make the compiler find + // them automatically. However, as there are duplicate codes in SparkContext for backward + // compatibility, please update them accordingly if you modify the following implicit objects. + + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 + def zero(initialValue: Double): Double = 0.0 + } + + implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 + def zero(initialValue: Int): Int = 0 + } + + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long): Long = t1 + t2 + def zero(initialValue: Long): Long = 0L + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float): Float = t1 + t2 + def zero(initialValue: Float): Float = 0f + } + + // TODO: Add AccumulatorParams for other types, e.g. lists and strings +} diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index b89972836fb05..32ab9b6801bb1 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -18,7 +18,6 @@ package org.apache.spark import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.rdd.RDD import org.apache.spark.storage._ @@ -67,11 +66,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { try { logInfo(s"Partition $key not found, computing it") val computedValues = rdd.computeOrReadCheckpoint(partition, context) - - // Otherwise, cache the values and keep track of any updates in block statuses - val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks) - context.taskMetrics().incUpdatedBlockStatuses(updatedBlocks) + val cachedValues = putInBlockManager(key, computedValues, storageLevel) new InterruptibleIterator(context, cachedValues) } finally { loading.synchronized { @@ -132,7 +127,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { key: BlockId, values: Iterator[T], level: StorageLevel, - updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)], effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { val putLevel = effectiveStorageLevel.getOrElse(level) @@ -141,8 +135,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * This RDD is not to be cached in memory, so we can just pass the computed values as an * iterator directly to the BlockManager rather than first fully unrolling it in memory. */ - updatedBlocks ++= - blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) + blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel) blockManager.get(key) match { case Some(v) => v.data.asInstanceOf[Iterator[T]] case None => @@ -160,11 +153,10 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { * single partition. Instead, we unroll the values cautiously, potentially aborting and * dropping the partition to disk if applicable. */ - blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { + blockManager.memoryStore.unrollSafely(key, values) match { case Left(arr) => // We have successfully unrolled the entire partition, so cache it in memory - updatedBlocks ++= - blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) + blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel) arr.iterator.asInstanceOf[Iterator[T]] case Right(it) => // There is not enough space to cache this partition in memory @@ -173,7 +165,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { logWarning(s"Persisting partition $key to disk instead.") val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false, useOffHeap = false, deserialized = false, putLevel.replication) - putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel)) + putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel)) } else { returnValues } diff --git a/core/src/main/scala/org/apache/spark/InternalAccumulator.scala b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala new file mode 100644 index 0000000000000..6ea997c079f33 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/InternalAccumulator.scala @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + + +// This is moved to its own file because many more things will be added to it in SPARK-10620. +private[spark] object InternalAccumulator { + val PEAK_EXECUTION_MEMORY = "peakExecutionMemory" + val TEST_ACCUMULATOR = "testAccumulator" + + // For testing only. + // This needs to be a def since we don't want to reuse the same accumulator across stages. + private def maybeTestAccumulator: Option[Accumulator[Long]] = { + if (sys.props.contains("spark.testing")) { + Some(new Accumulator( + 0L, AccumulatorParam.LongAccumulatorParam, Some(TEST_ACCUMULATOR), internal = true)) + } else { + None + } + } + + /** + * Accumulators for tracking internal metrics. + * + * These accumulators are created with the stage such that all tasks in the stage will + * add to the same set of accumulators. We do this to report the distribution of accumulator + * values across all tasks within each stage. + */ + def create(sc: SparkContext): Seq[Accumulator[Long]] = { + val internalAccumulators = Seq( + // Execution memory refers to the memory used by internal data structures created + // during shuffles, aggregations and joins. The value of this accumulator should be + // approximately the sum of the peak sizes across all such data structures created + // in this task. For SQL jobs, this only tracks all unsafe operators and ExternalSort. + new Accumulator( + 0L, AccumulatorParam.LongAccumulatorParam, Some(PEAK_EXECUTION_MEMORY), internal = true) + ) ++ maybeTestAccumulator.toSeq + internalAccumulators.foreach { accumulator => + sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator)) + } + internalAccumulators + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala new file mode 100644 index 0000000000000..8f1d7f89a44b4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/InputMetrics.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Method by which input data was read. Network means that the data was read over the network + * from a remote block manager (which may have stored the data on-disk or in-memory). + */ +@DeveloperApi +object DataReadMethod extends Enumeration with Serializable { + type DataReadMethod = Value + val Memory, Disk, Hadoop, Network = Value +} + + +/** + * :: DeveloperApi :: + * Metrics about reading input data. + */ +@DeveloperApi +case class InputMetrics(readMethod: DataReadMethod.Value) { + + /** + * This is volatile so that it is visible to the updater thread. + */ + @volatile @transient var bytesReadCallback: Option[() => Long] = None + + /** + * Total bytes read. + */ + private var _bytesRead: Long = _ + def bytesRead: Long = _bytesRead + def incBytesRead(bytes: Long): Unit = _bytesRead += bytes + + /** + * Total records read. + */ + private var _recordsRead: Long = _ + def recordsRead: Long = _recordsRead + def incRecordsRead(records: Long): Unit = _recordsRead += records + + /** + * Invoke the bytesReadCallback and mutate bytesRead. + */ + def updateBytesRead() { + bytesReadCallback.foreach { c => + _bytesRead = c() + } + } + + /** + * Register a function that can be called to get up-to-date information on how many bytes the task + * has read from an input source. + */ + def setBytesReadCallback(f: Option[() => Long]) { + bytesReadCallback = f + } +} diff --git a/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala new file mode 100644 index 0000000000000..ad132d004cde0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/OutputMetrics.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Method by which output data was written. + */ +@DeveloperApi +object DataWriteMethod extends Enumeration with Serializable { + type DataWriteMethod = Value + val Hadoop = Value +} + + +/** + * :: DeveloperApi :: + * Metrics about writing output data. + */ +@DeveloperApi +case class OutputMetrics(writeMethod: DataWriteMethod.Value) { + /** + * Total bytes written + */ + private var _bytesWritten: Long = _ + def bytesWritten: Long = _bytesWritten + private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value + + /** + * Total records written + */ + private var _recordsWritten: Long = 0L + def recordsWritten: Long = _recordsWritten + private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value +} diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala new file mode 100644 index 0000000000000..e985b35ace623 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleReadMetrics.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Metrics pertaining to shuffle data read in a given task. + */ +@DeveloperApi +class ShuffleReadMetrics extends Serializable { + /** + * Number of remote blocks fetched in this shuffle by this task + */ + private var _remoteBlocksFetched: Int = _ + def remoteBlocksFetched: Int = _remoteBlocksFetched + private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value + private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + + /** + * Number of local blocks fetched in this shuffle by this task + */ + private var _localBlocksFetched: Int = _ + def localBlocksFetched: Int = _localBlocksFetched + private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value + private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value + + /** + * Time the task spent waiting for remote shuffle blocks. This only includes the time + * blocking on shuffle input data. For instance if block B is being fetched while the task is + * still not finished processing block A, it is not considered to be blocking on block B. + */ + private var _fetchWaitTime: Long = _ + def fetchWaitTime: Long = _fetchWaitTime + private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value + private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value + + /** + * Total number of remote bytes read from the shuffle by this task + */ + private var _remoteBytesRead: Long = _ + def remoteBytesRead: Long = _remoteBytesRead + private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value + private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value + + /** + * Shuffle data that was read from the local disk (as opposed to from a remote executor). + */ + private var _localBytesRead: Long = _ + def localBytesRead: Long = _localBytesRead + private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value + + /** + * Total bytes fetched in the shuffle by this task (both remote and local). + */ + def totalBytesRead: Long = _remoteBytesRead + _localBytesRead + + /** + * Number of blocks fetched in this shuffle by this task (remote or local) + */ + def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched + + /** + * Total number of records read from the shuffle by this task + */ + private var _recordsRead: Long = _ + def recordsRead: Long = _recordsRead + private[spark] def incRecordsRead(value: Long) = _recordsRead += value + private[spark] def decRecordsRead(value: Long) = _recordsRead -= value +} diff --git a/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala new file mode 100644 index 0000000000000..469ebe26c7b56 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ShuffleWriteMetrics.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.annotation.DeveloperApi + + +/** + * :: DeveloperApi :: + * Metrics pertaining to shuffle data written in a given task. + */ +@DeveloperApi +class ShuffleWriteMetrics extends Serializable { + /** + * Number of bytes written for the shuffle by this task + */ + @volatile private var _shuffleBytesWritten: Long = _ + def shuffleBytesWritten: Long = _shuffleBytesWritten + private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value + private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value + + /** + * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds + */ + @volatile private var _shuffleWriteTime: Long = _ + def shuffleWriteTime: Long = _shuffleWriteTime + private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value + private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value + + /** + * Total number of records written to the shuffle by this task + */ + @volatile private var _shuffleRecordsWritten: Long = _ + def shuffleRecordsWritten: Long = _shuffleRecordsWritten + private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value + private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value + private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value +} diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index e67159155a761..2202e33090e71 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -27,6 +27,7 @@ import org.apache.spark.executor.DataReadMethod.DataReadMethod import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.util.Utils + /** * :: DeveloperApi :: * Metrics tracked during the execution of a task. @@ -326,6 +327,7 @@ class TaskMetrics extends Serializable { } } + private[spark] object TaskMetrics { private val hostNameCache = new ConcurrentHashMap[String, String]() @@ -336,187 +338,3 @@ private[spark] object TaskMetrics { if (canonicalHost != null) canonicalHost else host } } - -/** - * :: DeveloperApi :: - * Method by which input data was read. Network means that the data was read over the network - * from a remote block manager (which may have stored the data on-disk or in-memory). - */ -@DeveloperApi -object DataReadMethod extends Enumeration with Serializable { - type DataReadMethod = Value - val Memory, Disk, Hadoop, Network = Value -} - -/** - * :: DeveloperApi :: - * Method by which output data was written. - */ -@DeveloperApi -object DataWriteMethod extends Enumeration with Serializable { - type DataWriteMethod = Value - val Hadoop = Value -} - -/** - * :: DeveloperApi :: - * Metrics about reading input data. - */ -@DeveloperApi -case class InputMetrics(readMethod: DataReadMethod.Value) { - - /** - * This is volatile so that it is visible to the updater thread. - */ - @volatile @transient var bytesReadCallback: Option[() => Long] = None - - /** - * Total bytes read. - */ - private var _bytesRead: Long = _ - def bytesRead: Long = _bytesRead - def incBytesRead(bytes: Long): Unit = _bytesRead += bytes - - /** - * Total records read. - */ - private var _recordsRead: Long = _ - def recordsRead: Long = _recordsRead - def incRecordsRead(records: Long): Unit = _recordsRead += records - - /** - * Invoke the bytesReadCallback and mutate bytesRead. - */ - def updateBytesRead() { - bytesReadCallback.foreach { c => - _bytesRead = c() - } - } - - /** - * Register a function that can be called to get up-to-date information on how many bytes the task - * has read from an input source. - */ - def setBytesReadCallback(f: Option[() => Long]) { - bytesReadCallback = f - } -} - -/** - * :: DeveloperApi :: - * Metrics about writing output data. - */ -@DeveloperApi -case class OutputMetrics(writeMethod: DataWriteMethod.Value) { - /** - * Total bytes written - */ - private var _bytesWritten: Long = _ - def bytesWritten: Long = _bytesWritten - private[spark] def setBytesWritten(value : Long): Unit = _bytesWritten = value - - /** - * Total records written - */ - private var _recordsWritten: Long = 0L - def recordsWritten: Long = _recordsWritten - private[spark] def setRecordsWritten(value: Long): Unit = _recordsWritten = value -} - -/** - * :: DeveloperApi :: - * Metrics pertaining to shuffle data read in a given task. - */ -@DeveloperApi -class ShuffleReadMetrics extends Serializable { - /** - * Number of remote blocks fetched in this shuffle by this task - */ - private var _remoteBlocksFetched: Int = _ - def remoteBlocksFetched: Int = _remoteBlocksFetched - private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value - private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value - - /** - * Number of local blocks fetched in this shuffle by this task - */ - private var _localBlocksFetched: Int = _ - def localBlocksFetched: Int = _localBlocksFetched - private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value - private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value - - /** - * Time the task spent waiting for remote shuffle blocks. This only includes the time - * blocking on shuffle input data. For instance if block B is being fetched while the task is - * still not finished processing block A, it is not considered to be blocking on block B. - */ - private var _fetchWaitTime: Long = _ - def fetchWaitTime: Long = _fetchWaitTime - private[spark] def incFetchWaitTime(value: Long) = _fetchWaitTime += value - private[spark] def decFetchWaitTime(value: Long) = _fetchWaitTime -= value - - /** - * Total number of remote bytes read from the shuffle by this task - */ - private var _remoteBytesRead: Long = _ - def remoteBytesRead: Long = _remoteBytesRead - private[spark] def incRemoteBytesRead(value: Long) = _remoteBytesRead += value - private[spark] def decRemoteBytesRead(value: Long) = _remoteBytesRead -= value - - /** - * Shuffle data that was read from the local disk (as opposed to from a remote executor). - */ - private var _localBytesRead: Long = _ - def localBytesRead: Long = _localBytesRead - private[spark] def incLocalBytesRead(value: Long) = _localBytesRead += value - - /** - * Total bytes fetched in the shuffle by this task (both remote and local). - */ - def totalBytesRead: Long = _remoteBytesRead + _localBytesRead - - /** - * Number of blocks fetched in this shuffle by this task (remote or local) - */ - def totalBlocksFetched: Int = _remoteBlocksFetched + _localBlocksFetched - - /** - * Total number of records read from the shuffle by this task - */ - private var _recordsRead: Long = _ - def recordsRead: Long = _recordsRead - private[spark] def incRecordsRead(value: Long) = _recordsRead += value - private[spark] def decRecordsRead(value: Long) = _recordsRead -= value -} - -/** - * :: DeveloperApi :: - * Metrics pertaining to shuffle data written in a given task. - */ -@DeveloperApi -class ShuffleWriteMetrics extends Serializable { - /** - * Number of bytes written for the shuffle by this task - */ - @volatile private var _shuffleBytesWritten: Long = _ - def shuffleBytesWritten: Long = _shuffleBytesWritten - private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value - private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value - - /** - * Time the task spent blocking on writes to disk or buffer cache, in nanoseconds - */ - @volatile private var _shuffleWriteTime: Long = _ - def shuffleWriteTime: Long = _shuffleWriteTime - private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value - private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value - - /** - * Total number of records written to the shuffle by this task - */ - @volatile private var _shuffleRecordsWritten: Long = _ - def shuffleRecordsWritten: Long = _shuffleRecordsWritten - private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value - private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value - private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value -} diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 33f8b9f16c11b..b5adbd88a2c23 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -19,10 +19,8 @@ package org.apache.spark.memory import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable - import org.apache.spark.{Logging, SparkConf} -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} +import org.apache.spark.storage.{BlockId, MemoryStore} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.memory.MemoryAllocator @@ -67,17 +65,11 @@ private[spark] abstract class MemoryManager( storageMemoryPool.setMemoryStore(store) } - // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985) - /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. * @return whether all N bytes were successfully granted. */ - def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean + def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. @@ -85,14 +77,10 @@ private[spark] abstract class MemoryManager( * This extra method allows subclasses to differentiate behavior between acquiring storage * memory and acquiring unroll memory. For instance, the memory management model in Spark * 1.5 and before places a limit on the amount of space that can be freed from unrolling. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. * * @return whether all N bytes were successfully granted. */ - def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean + def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean /** * Try to acquire up to `numBytes` of execution memory for the current task and return the diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 3554b558f2123..f9f8f820bc49c 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -17,10 +17,8 @@ package org.apache.spark.memory -import scala.collection.mutable - import org.apache.spark.SparkConf -import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.storage.BlockId /** * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. @@ -53,24 +51,18 @@ private[spark] class StaticMemoryManager( (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { if (numBytes > maxStorageMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxStorageMemory bytes)") false } else { - storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes) } } - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory val freeMemory = storageMemoryPool.memoryFree // When unrolling, we will use all of the existing free memory, and, if necessary, @@ -80,7 +72,7 @@ private[spark] class StaticMemoryManager( val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory) // Keep it within the range 0 <= X <= maxNumBytesToFree val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory)) - storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree) } private[memory] diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index 79411e7eb6f10..6a88966f60d23 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -19,11 +19,8 @@ package org.apache.spark.memory import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer - -import org.apache.spark.{Logging, TaskContext} -import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} +import org.apache.spark.Logging +import org.apache.spark.storage.{BlockId, MemoryStore} /** * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage @@ -58,15 +55,11 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. * @return whether all N bytes were successfully granted. */ - def acquireMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { + def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized { val numBytesToFree = math.max(0, numBytes - memoryFree) - acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) + acquireMemory(blockId, numBytes, numBytesToFree) } /** @@ -80,15 +73,12 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w def acquireMemory( blockId: BlockId, numBytesToAcquire: Long, - numBytesToFree: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { + numBytesToFree: Long): Boolean = lock.synchronized { assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) if (numBytesToFree > 0) { - memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, evictedBlocks) - // Register evicted blocks, if any, with the active task metrics - Option(TaskContext.get()).foreach(_.taskMetrics().incUpdatedBlockStatuses(evictedBlocks)) + memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables @@ -125,9 +115,7 @@ private[memory] class StorageMemoryPool(lock: Object) extends MemoryPool(lock) w val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: - val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, evictedBlocks) - val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum + val spaceFreedByEviction = memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index 57a24ac140287..a3321e3f179f6 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -17,10 +17,8 @@ package org.apache.spark.memory -import scala.collection.mutable - import org.apache.spark.SparkConf -import org.apache.spark.storage.{BlockId, BlockStatus} +import org.apache.spark.storage.BlockId /** * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that @@ -133,10 +131,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( } } - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) assert(numBytes >= 0) if (numBytes > maxStorageMemory) { @@ -152,14 +147,11 @@ private[spark] class UnifiedMemoryManager private[memory] ( onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution) storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution) } - storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes) } - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - acquireStorageMemory(blockId, numBytes, evictedBlocks) + override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = synchronized { + acquireStorageMemory(blockId, numBytes) } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e49d79b8ad66e..77fd03a6bcfc5 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -612,12 +612,16 @@ private[spark] class BlockManager( None } + /** + * @return true if the block was stored or false if the block was already stored or an + * error occurred. + */ def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(values != null, "Values is null") doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) } @@ -641,28 +645,32 @@ private[spark] class BlockManager( /** * Put a new block of values to the block manager. - * Return a list of blocks updated as a result of this put. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ def putArray( blockId: BlockId, values: Array[Any], level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(values != null, "Values is null") doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel) } /** * Put a new block of serialized bytes to the block manager. - * Return a list of blocks updated as a result of this put. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ def putBytes( blockId: BlockId, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(bytes != null, "Bytes is null") doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel) } @@ -674,14 +682,16 @@ private[spark] class BlockManager( * The effective storage level refers to the level according to which the block will actually be * handled. This allows the caller to specify an alternate behavior of doPut while preserving * the original level specified by the user. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ private def doPut( blockId: BlockId, data: BlockValues, level: StorageLevel, tellMaster: Boolean = true, - effectiveStorageLevel: Option[StorageLevel] = None) - : Seq[(BlockId, BlockStatus)] = { + effectiveStorageLevel: Option[StorageLevel] = None): Boolean = { require(blockId != null, "BlockId is null") require(level != null && level.isValid, "StorageLevel is null or invalid") @@ -689,9 +699,6 @@ private[spark] class BlockManager( require(level != null && level.isValid, "Effective StorageLevel is null or invalid") } - // Return value - val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - /* Remember the block's storage level so that we can correctly drop it to disk if it needs * to be dropped right after it got put into memory. Note, however, that other threads will * not be able to get() this block until we call markReady on its BlockInfo. */ @@ -702,7 +709,7 @@ private[spark] class BlockManager( if (oldBlockOpt.isDefined) { if (oldBlockOpt.get.waitForReady()) { logWarning(s"Block $blockId already exists on this machine; not re-adding it") - return updatedBlocks + return false } // TODO: So the block info exists - but previous attempt to load it (?) failed. // What do we do now ? Retry on it ? @@ -743,11 +750,12 @@ private[spark] class BlockManager( case _ => null } + var marked = false + putBlockInfo.synchronized { logTrace("Put for block %s took %s to get into synchronized block" .format(blockId, Utils.getUsedTimeMs(startTimeMs))) - var marked = false try { // returnValues - Whether to return the values put // blockStore - The type of storage to put these values into @@ -783,11 +791,6 @@ private[spark] class BlockManager( case _ => } - // Keep track of which blocks are dropped from memory - if (putLevel.useMemory) { - result.droppedBlocks.foreach { updatedBlocks += _ } - } - val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo) if (putBlockStatus.storageLevel != StorageLevel.NONE) { // Now that the block is in either the memory, externalBlockStore, or disk store, @@ -797,7 +800,9 @@ private[spark] class BlockManager( if (tellMaster) { reportBlockStatus(blockId, putBlockInfo, putBlockStatus) } - updatedBlocks += ((blockId, putBlockStatus)) + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, putBlockStatus))) + } } } finally { // If we failed in putting the block to memory/disk, notify other possible readers @@ -847,7 +852,7 @@ private[spark] class BlockManager( .format(blockId, Utils.getUsedTimeMs(startTimeMs))) } - updatedBlocks + marked } /** @@ -967,32 +972,27 @@ private[spark] class BlockManager( /** * Write a block consisting of a single object. + * + * @return true if the block was stored or false if the block was already stored or an + * error occurred. */ def putSingle( blockId: BlockId, value: Any, level: StorageLevel, - tellMaster: Boolean = true): Seq[(BlockId, BlockStatus)] = { + tellMaster: Boolean = true): Boolean = { putIterator(blockId, Iterator(value), level, tellMaster) } - def dropFromMemory( - blockId: BlockId, - data: Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { - dropFromMemory(blockId, () => data) - } - /** * Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * store reaches its limit and needs to free up space. * * If `data` is not put on disk, it won't be created. - * - * Return the block status if the given block has been updated, else None. */ def dropFromMemory( blockId: BlockId, - data: () => Either[Array[Any], ByteBuffer]): Option[BlockStatus] = { + data: () => Either[Array[Any], ByteBuffer]): Unit = { logInfo(s"Dropping block $blockId from memory") val info = blockInfo.get(blockId) @@ -1005,10 +1005,10 @@ private[spark] class BlockManager( if (!info.waitForReady()) { // If we get here, the block write failed. logWarning(s"Block $blockId was marked as failure. Nothing to drop") - return None + return } else if (blockInfo.asScala.get(blockId).isEmpty) { logWarning(s"Block $blockId was already dropped.") - return None + return } var blockIsUpdated = false val level = info.level @@ -1044,11 +1044,12 @@ private[spark] class BlockManager( blockInfo.remove(blockId) } if (blockIsUpdated) { - return Some(status) + Option(TaskContext.get()).foreach { c => + c.taskMetrics().incUpdatedBlockStatuses(Seq((blockId, status))) + } } } } - None } /** diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index bdab8c2332fae..76aaa782b9524 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -95,9 +95,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val values = blockManager.dataDeserialize(blockId, bytes) putIterator(blockId, values, level, returnValues = true) } else { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + tryToPut(blockId, bytes, bytes.limit, deserialized = false) + PutResult(bytes.limit(), Right(bytes.duplicate())) } } @@ -110,8 +109,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = { // Work on a duplicate - since the original input might be used elsewhere. lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer] - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false, droppedBlocks) + val putSuccess = tryToPut(blockId, () => bytes, size, deserialized = false) val data = if (putSuccess) { assert(bytes.limit == size) @@ -119,7 +117,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { null } - PutResult(size, data, droppedBlocks) + PutResult(size, data) } override def putArray( @@ -127,15 +125,14 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo values: Array[Any], level: StorageLevel, returnValues: Boolean): PutResult = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] if (level.deserialized) { val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) - tryToPut(blockId, values, sizeEstimate, deserialized = true, droppedBlocks) - PutResult(sizeEstimate, Left(values.iterator), droppedBlocks) + tryToPut(blockId, values, sizeEstimate, deserialized = true) + PutResult(sizeEstimate, Left(values.iterator)) } else { val bytes = blockManager.dataSerialize(blockId, values.iterator) - tryToPut(blockId, bytes, bytes.limit, deserialized = false, droppedBlocks) - PutResult(bytes.limit(), Right(bytes.duplicate()), droppedBlocks) + tryToPut(blockId, bytes, bytes.limit, deserialized = false) + PutResult(bytes.limit(), Right(bytes.duplicate())) } } @@ -165,22 +162,20 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo level: StorageLevel, returnValues: Boolean, allowPersistToDisk: Boolean): PutResult = { - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val unrolledValues = unrollSafely(blockId, values, droppedBlocks) + val unrolledValues = unrollSafely(blockId, values) unrolledValues match { case Left(arrayValues) => // Values are fully unrolled in memory, so store them as an array val res = putArray(blockId, arrayValues, level, returnValues) - droppedBlocks ++= res.droppedBlocks - PutResult(res.size, res.data, droppedBlocks) + PutResult(res.size, res.data) case Right(iteratorValues) => // Not enough space to unroll this block; drop to disk if applicable if (level.useDisk && allowPersistToDisk) { logWarning(s"Persisting block $blockId to disk instead.") val res = blockManager.diskStore.putIterator(blockId, iteratorValues, level, returnValues) - PutResult(res.size, res.data, droppedBlocks) + PutResult(res.size, res.data) } else { - PutResult(0, Left(iteratorValues), droppedBlocks) + PutResult(0, Left(iteratorValues)) } } } @@ -246,11 +241,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * This method returns either an array with the contents of the entire block or an iterator * containing the values of the block (if the array would have exceeded available memory). */ - def unrollSafely( - blockId: BlockId, - values: Iterator[Any], - droppedBlocks: ArrayBuffer[(BlockId, BlockStatus)]) - : Either[Array[Any], Iterator[Any]] = { + def unrollSafely(blockId: BlockId, values: Iterator[Any]): Either[Array[Any], Iterator[Any]] = { // Number of elements unrolled so far var elementsUnrolled = 0 @@ -270,7 +261,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo var vector = new SizeTrackingVector[Any] // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, droppedBlocks) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -286,8 +277,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask( - blockId, amountToRequest, droppedBlocks) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) // New threshold is currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } @@ -337,9 +327,8 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, value: Any, size: Long, - deserialized: Boolean, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - tryToPut(blockId, () => value, size, deserialized, droppedBlocks) + deserialized: Boolean): Boolean = { + tryToPut(blockId, () => value, size, deserialized) } /** @@ -355,16 +344,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * blocks to free memory for one block, another thread may use up the freed space for * another block. * - * All blocks evicted in the process, if any, will be added to `droppedBlocks`. - * * @return whether put was successful. */ private def tryToPut( blockId: BlockId, value: () => Any, size: Long, - deserialized: Boolean, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + deserialized: Boolean): Boolean = { /* TODO: Its possible to optimize the locking by locking entries only when selecting blocks * to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has @@ -380,7 +366,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo // happen atomically. This relies on the assumption that all memory acquisitions are // synchronized on the same lock. releasePendingUnrollMemoryForThisTask() - val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) + val enoughMemory = memoryManager.acquireStorageMemory(blockId, size) if (enoughMemory) { // We acquired enough memory for the block, so go ahead and put it val entry = new MemoryEntry(value(), size, deserialized) @@ -398,8 +384,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { Right(value().asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + blockManager.dropFromMemory(blockId, () => data) } enoughMemory } @@ -413,13 +398,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * * @param blockId the ID of the block we are freeing space for, if any * @param space the size of this block - * @param droppedBlocks a holder for blocks evicted in the process - * @return whether the requested free space is freed. + * @return the amount of memory (in bytes) freed by eviction */ - private[spark] def evictBlocksToFreeSpace( - blockId: Option[BlockId], - space: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L @@ -453,17 +434,16 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } else { Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) } - val droppedBlockStatus = blockManager.dropFromMemory(blockId, data) - droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) } + blockManager.dropFromMemory(blockId, () => data) } } - true + freedMemory } else { blockId.foreach { id => logInfo(s"Will not store $id as it would require dropping another block " + "from the same RDD") } - false + 0L } } } @@ -481,12 +461,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * Reserve memory for unrolling the given block for this task. * @return whether the request is granted. */ - def reserveUnrollMemoryForThisTask( - blockId: BlockId, - memory: Long, - droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = { memoryManager.synchronized { - val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) + val success = memoryManager.acquireUnrollMemory(blockId, memory) if (success) { val taskAttemptId = currentTaskAttemptId() unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory diff --git a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala index 7c8529f687d4c..48a0282b30cf0 100644 --- a/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/CacheManagerSuite.scala @@ -85,7 +85,12 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before test("verify task metrics updated correctly") { cacheManager = sc.env.cacheManager val context = TaskContext.empty() - cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) - assert(context.taskMetrics.updatedBlockStatuses.size === 2) + try { + TaskContext.setTaskContext(context) + cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY) + assert(context.taskMetrics.updatedBlockStatuses.size === 2) + } finally { + TaskContext.unset() + } } } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 3b2368798c1dd..d9764c7c10983 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -70,8 +70,7 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft */ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = { val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) - when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())) - .thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) + when(ms.evictBlocksToFreeSpace(any(), anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) mm.setMemoryStore(ms) ms } @@ -89,9 +88,9 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft * records the number of bytes this is called with. This variable is expected to be cleared * by the test code later through [[assertEvictBlocksToFreeSpaceCalled]]. */ - private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Boolean] = { - new Answer[Boolean] { - override def answer(invocation: InvocationOnMock): Boolean = { + private def evictBlocksToFreeSpaceAnswer(mm: MemoryManager): Answer[Long] = { + new Answer[Long] { + override def answer(invocation: InvocationOnMock): Long = { val args = invocation.getArguments val numBytesToFree = args(1).asInstanceOf[Long] assert(numBytesToFree > 0) @@ -101,20 +100,12 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft if (numBytesToFree <= mm.storageMemoryUsed) { // We can evict enough blocks to fulfill the request for space mm.releaseStorageMemory(numBytesToFree) - args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( + evictedBlocks.append( (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))) - // We need to add this call so that that the suite-level `evictedBlocks` is updated when - // execution evicts storage; in that case, args.last will not be equal to evictedBlocks - // because it will be a temporary buffer created inside of the MemoryManager rather than - // being passed in by the test code. - if (!(evictedBlocks eq args.last)) { - evictedBlocks.append( - (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytesToFree, 0L))) - } - true + numBytesToFree } else { // No blocks were evicted because eviction would not free enough space. - false + 0L } } } diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 68cf26fc3ed5d..eee78d396e147 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -81,22 +81,22 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val dummyBlock = TestBlockId("you can see the world you brought to live") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 10L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) - assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 100L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() @@ -107,12 +107,12 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired @@ -134,7 +134,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase - assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 50L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) @@ -152,21 +152,21 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { val maxStorageMem = 1000L val dummyBlock = TestBlockId("lonely water") val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireUnrollMemory(dummyBlock, 100L)) when(ms.currentUnrollMemory).thenReturn(100L) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 800L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 860L) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // As of this point, cache memory is 800 bytes and current unroll memory is 60 bytes. // Requesting 240 more bytes of unroll memory will leave our total unroll memory at // 300 bytes, still under the 400-byte limit. Therefore, all 240 bytes are granted. - assert(mm.acquireUnrollMemory(dummyBlock, 240L, evictedBlocks)) + assert(mm.acquireUnrollMemory(dummyBlock, 240L)) assertEvictBlocksToFreeSpaceCalled(ms, 100L) // 860 + 240 - 1000 when(ms.currentUnrollMemory).thenReturn(300L) // 60 + 240 assert(mm.storageMemoryUsed === 1000L) @@ -174,7 +174,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { // We already have 300 bytes of unroll memory, so requesting 150 more will leave us // above the 400-byte limit. Since there is not enough free memory, this request will // fail even after evicting as much as we can (400 - 300 = 100 bytes). - assert(!mm.acquireUnrollMemory(dummyBlock, 150L, evictedBlocks)) + assert(!mm.acquireUnrollMemory(dummyBlock, 150L)) assertEvictBlocksToFreeSpaceCalled(ms, 100L) assert(mm.storageMemoryUsed === 900L) // Release beyond what was acquired diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 4a1e49b45df40..e5cb9d3a99f0b 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -41,14 +41,8 @@ class TestMemoryManager(conf: SparkConf) grant } } - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def acquireUnrollMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true + override def acquireStorageMemory(blockId: BlockId, numBytes: Long): Boolean = true + override def acquireUnrollMemory(blockId: BlockId, numBytes: Long): Boolean = true override def releaseStorageMemory(numBytes: Long): Unit = {} override private[memory] def releaseExecutionMemory( numBytes: Long, diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 6cc48597d38f9..0c4359c3c2cd5 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -74,24 +74,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val maxMemory = 1000L val (mm, ms) = makeThings(maxMemory) assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 10L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 10L) - assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 100L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire more than the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 110L) // Acquire up to the max, requests after this are still granted due to LRU eviction - assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, maxMemory)) assertEvictBlocksToFreeSpaceCalled(ms, 110L) assert(mm.storageMemoryUsed === 1000L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceCalled(ms, 1L) assert(evictedBlocks.nonEmpty) evictedBlocks.clear() @@ -102,12 +102,12 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 201L) mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) - assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 1L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired @@ -120,7 +120,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region - assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 750L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) @@ -140,7 +140,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region - assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 400L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 400L) assert(mm.executionMemoryUsed === 300L) @@ -157,7 +157,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) // Acquire enough storage memory to exceed the storage region size - assert(mm.acquireStorageMemory(dummyBlock, 700L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 700L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 700L) @@ -182,11 +182,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.storageMemoryUsed === 0L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should not be able to evict execution - assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 100L)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) assertEvictBlocksToFreeSpaceNotCalled(ms) - assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, 250L)) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) // Do not attempt to evict blocks, since evicting will not free enough memory: @@ -199,11 +199,11 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.storageMemoryUsed === 0L) assertEvictBlocksToFreeSpaceNotCalled(ms) // Storage should still not be able to evict execution - assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 750L)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) assertEvictBlocksToFreeSpaceNotCalled(ms) // since there were 800 bytes free - assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks)) + assert(!mm.acquireStorageMemory(dummyBlock, 850L)) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 750L) // Do not attempt to evict blocks, since evicting will not free enough memory: @@ -243,7 +243,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L) assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L) // Fill up all of the remaining memory with storage. - assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks)) + assert(mm.acquireStorageMemory(dummyBlock, 800L)) assertEvictBlocksToFreeSpaceNotCalled(ms) assert(mm.storageMemoryUsed === 800) assert(mm.executionMemoryUsed === 200) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 0f3156117004b..e1b2c9633edca 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -184,8 +184,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(master.getLocations("a3").size === 0, "master was told about a3") // Drop a1 and a2 from memory; this should be reported back to the master - store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) - store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store") assert(master.getLocations("a1").size === 0, "master did not remove a1") @@ -425,8 +425,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE t2.join() t3.join() - store.dropFromMemory("a1", null: Either[Array[Any], ByteBuffer]) - store.dropFromMemory("a2", null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a1", () => null: Either[Array[Any], ByteBuffer]) + store.dropFromMemory("a2", () => null: Either[Array[Any], ByteBuffer]) store.waitForAsyncReregister() } } @@ -847,23 +847,37 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val list = List.fill(2)(new Array[Byte](2000)) val bigList = List.fill(8)(new Array[Byte](2000)) + def getUpdatedBlocks(task: => Unit): Seq[(BlockId, BlockStatus)] = { + val context = TaskContext.empty() + try { + TaskContext.setTaskContext(context) + task + } finally { + TaskContext.unset() + } + context.taskMetrics.updatedBlockStatuses + } + // 1 updated block (i.e. list1) - val updatedBlocks1 = + val updatedBlocks1 = getUpdatedBlocks { store.putIterator("list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } assert(updatedBlocks1.size === 1) assert(updatedBlocks1.head._1 === TestBlockId("list1")) assert(updatedBlocks1.head._2.storageLevel === StorageLevel.MEMORY_ONLY) // 1 updated block (i.e. list2) - val updatedBlocks2 = + val updatedBlocks2 = getUpdatedBlocks { store.putIterator("list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true) + } assert(updatedBlocks2.size === 1) assert(updatedBlocks2.head._1 === TestBlockId("list2")) assert(updatedBlocks2.head._2.storageLevel === StorageLevel.MEMORY_ONLY) // 2 updated blocks - list1 is kicked out of memory while list3 is added - val updatedBlocks3 = + val updatedBlocks3 = getUpdatedBlocks { store.putIterator("list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } assert(updatedBlocks3.size === 2) updatedBlocks3.foreach { case (id, status) => id match { @@ -875,8 +889,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains("list3"), "list3 was not in memory store") // 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added - val updatedBlocks4 = + val updatedBlocks4 = getUpdatedBlocks { store.putIterator("list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } assert(updatedBlocks4.size === 2) updatedBlocks4.foreach { case (id, status) => id match { @@ -889,8 +904,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(store.memoryStore.contains("list4"), "list4 was not in memory store") // No updated blocks - list5 is too big to fit in store and nothing is kicked out - val updatedBlocks5 = + val updatedBlocks5 = getUpdatedBlocks { store.putIterator("list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true) + } assert(updatedBlocks5.size === 0) // memory store contains only list3 and list4 @@ -1005,8 +1021,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE assert(memoryStore.currentUnrollMemoryForThisTask === 0) def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { - memoryStore.reserveUnrollMemoryForThisTask( - TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)]) + memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory) } // Reserve @@ -1062,11 +1077,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val smallList = List.fill(40)(new Array[Byte](100)) val bigList = List.fill(40)(new Array[Byte](1000)) val memoryStore = store.memoryStore - val droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] assert(memoryStore.currentUnrollMemoryForThisTask === 0) // Unroll with all the space in the world. This should succeed and return an array. - var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) + var unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) memoryStore.releasePendingUnrollMemoryForThisTask() @@ -1074,24 +1088,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE // Unroll with not enough space. This should succeed after kicking out someBlock1. store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY) store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator, droppedBlocks) + unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator) verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true) assert(memoryStore.currentUnrollMemoryForThisTask === 0) - assert(droppedBlocks.size === 1) - assert(droppedBlocks.head._1 === TestBlockId("someBlock1")) - droppedBlocks.clear() + assert(memoryStore.contains("someBlock2")) + assert(!memoryStore.contains("someBlock1")) memoryStore.releasePendingUnrollMemoryForThisTask() // Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 = // 4800 bytes, there is still not enough room to unroll this block. This returns an iterator. // In the mean time, however, we kicked out someBlock2 before giving up. store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY) - unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator, droppedBlocks) + unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator) verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false) assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator - assert(droppedBlocks.size === 1) - assert(droppedBlocks.head._1 === TestBlockId("someBlock2")) - droppedBlocks.clear() + assert(!memoryStore.contains("someBlock2")) } test("safely unroll blocks through putIterator") { @@ -1238,7 +1249,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE }) assert(result.size === 13000) assert(result.data === null) - assert(result.droppedBlocks === Nil) } test("put a small ByteBuffer to MemoryStore") { @@ -1252,6 +1262,5 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE }) assert(result.size === 10000) assert(result.data === Right(bytes)) - assert(result.droppedBlocks === Nil) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala index 716bc63e00995..7ff5ad143f80b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorAssembler.scala @@ -70,19 +70,19 @@ class VectorAssembler(override val uid: String) val group = AttributeGroup.fromStructField(field) if (group.attributes.isDefined) { // If attributes are defined, copy them with updated names. - group.attributes.get.map { attr => + group.attributes.get.zipWithIndex.map { case (attr, i) => if (attr.name.isDefined) { // TODO: Define a rigorous naming scheme. attr.withName(c + "_" + attr.name.get) } else { - attr + attr.withName(c + "_" + i) } } } else { // Otherwise, treat all attributes as numeric. If we cannot get the number of attributes // from metadata, check the first row. val numAttrs = group.numAttributes.getOrElse(first.getAs[Vector](index).size) - Array.fill(numAttrs)(NumericAttribute.defaultAttr) + Array.tabulate(numAttrs)(i => NumericAttribute.defaultAttr.withName(c + "_" + i)) } case otherType => throw new SparkException(s"VectorAssembler does not support the $otherType type") diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala index dc20a5ec2152d..16e565d8b588b 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/RFormulaSuite.scala @@ -143,6 +143,44 @@ class RFormulaSuite extends SparkFunSuite with MLlibTestSparkContext { assert(attrs === expectedAttrs) } + test("vector attribute generation") { + val formula = new RFormula().setFormula("id ~ vec") + val original = sqlContext.createDataFrame( + Seq((1, Vectors.dense(0.0, 1.0)), (2, Vectors.dense(1.0, 2.0))) + ).toDF("id", "vec") + val model = formula.fit(original) + val result = model.transform(original) + val attrs = AttributeGroup.fromStructField(result.schema("features")) + val expectedAttrs = new AttributeGroup( + "features", + Array[Attribute]( + new NumericAttribute(Some("vec_0"), Some(1)), + new NumericAttribute(Some("vec_1"), Some(2)))) + assert(attrs === expectedAttrs) + } + + test("vector attribute generation with unnamed input attrs") { + val formula = new RFormula().setFormula("id ~ vec2") + val base = sqlContext.createDataFrame( + Seq((1, Vectors.dense(0.0, 1.0)), (2, Vectors.dense(1.0, 2.0))) + ).toDF("id", "vec") + val metadata = new AttributeGroup( + "vec2", + Array[Attribute]( + NumericAttribute.defaultAttr, + NumericAttribute.defaultAttr)).toMetadata + val original = base.select(base.col("id"), base.col("vec").as("vec2", metadata)) + val model = formula.fit(original) + val result = model.transform(original) + val attrs = AttributeGroup.fromStructField(result.schema("features")) + val expectedAttrs = new AttributeGroup( + "features", + Array[Attribute]( + new NumericAttribute(Some("vec2_0"), Some(1)), + new NumericAttribute(Some("vec2_1"), Some(2)))) + assert(attrs === expectedAttrs) + } + test("numeric interaction") { val formula = new RFormula().setFormula("a ~ b:c:d") val original = sqlContext.createDataFrame( diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala index f7de7c1e93fb2..dce994fdbd056 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/VectorAssemblerSuite.scala @@ -111,8 +111,8 @@ class VectorAssemblerSuite assert(userGenderOut === user.getAttr("gender").withName("user_gender").withIndex(3)) val userSalaryOut = features.getAttr(4) assert(userSalaryOut === user.getAttr("salary").withName("user_salary").withIndex(4)) - assert(features.getAttr(5) === NumericAttribute.defaultAttr.withIndex(5)) - assert(features.getAttr(6) === NumericAttribute.defaultAttr.withIndex(6)) + assert(features.getAttr(5) === NumericAttribute.defaultAttr.withIndex(5).withName("ad_0")) + assert(features.getAttr(6) === NumericAttribute.defaultAttr.withIndex(6).withName("ad_1")) } test("read/write") { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala index 2737fe32cd086..7df3787e6d2d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercion.scala @@ -27,10 +27,20 @@ import org.apache.spark.sql.types._ /** - * A collection of [[Rule Rules]] that can be used to coerce differing types that - * participate in operations into compatible ones. Most of these rules are based on Hive semantics, - * but they do not introduce any dependencies on the hive codebase. For this reason they remain in - * Catalyst until we have a more standard set of coercions. + * A collection of [[Rule Rules]] that can be used to coerce differing types that participate in + * operations into compatible ones. + * + * Most of these rules are based on Hive semantics, but they do not introduce any dependencies on + * the hive codebase. + * + * Notes about type widening / tightest common types: Broadly, there are two cases when we need + * to widen data types (e.g. union, binary comparison). In case 1, we are looking for a common + * data type for two or more data types, and in this case no loss of precision is allowed. Examples + * include type inference in JSON (e.g. what's the column's data type if one row is an integer + * while the other row is a long?). In case 2, we are looking for a widened data type with + * some acceptable loss of precision (e.g. there is no common type for double and decimal because + * double's range is larger than decimal, and yet decimal is more precise than double, but in + * union we would cast the decimal into double). */ object HiveTypeCoercion { @@ -63,6 +73,8 @@ object HiveTypeCoercion { DoubleType) /** + * Case 1 type widening (see the classdoc comment above for HiveTypeCoercion). + * * Find the tightest common type of two types that might be used in a binary expression. * This handles all numeric types except fixed-precision decimals interacting with each other or * with primitive types, because in that case the precision and scale of the result depends on @@ -118,6 +130,12 @@ object HiveTypeCoercion { }) } + /** + * Case 2 type widening (see the classdoc comment above for HiveTypeCoercion). + * + * i.e. the main difference with [[findTightestCommonTypeOfTwo]] is that here we allow some + * loss of precision when widening decimal and double. + */ private def findWiderTypeForTwo(t1: DataType, t2: DataType): Option[DataType] = (t1, t2) match { case (t1: DecimalType, t2: DecimalType) => Some(DecimalPrecision.widerDecimalType(t1, t2)) @@ -125,9 +143,7 @@ object HiveTypeCoercion { Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d)) case (d: DecimalType, t: IntegralType) => Some(DecimalPrecision.widerDecimalType(DecimalType.forType(t), d)) - case (t: FractionalType, d: DecimalType) => - Some(DoubleType) - case (d: DecimalType, t: FractionalType) => + case (_: FractionalType, _: DecimalType) | (_: DecimalType, _: FractionalType) => Some(DoubleType) case _ => findTightestCommonTypeToString(t1, t2) @@ -200,41 +216,37 @@ object HiveTypeCoercion { */ object WidenSetOperationTypes extends Rule[LogicalPlan] { - private[this] def widenOutputTypes( - planName: String, - left: LogicalPlan, - right: LogicalPlan): (LogicalPlan, LogicalPlan) = { - require(left.output.length == right.output.length) + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case p if p.analyzed => p - val castedTypes = left.output.zip(right.output).map { - case (lhs, rhs) if lhs.dataType != rhs.dataType => - findWiderTypeForTwo(lhs.dataType, rhs.dataType) - case other => None - } + case s @ SetOperation(left, right) if s.childrenResolved + && left.output.length == right.output.length && !s.resolved => - def castOutput(plan: LogicalPlan): LogicalPlan = { - val casted = plan.output.zip(castedTypes).map { - case (e, Some(dt)) if e.dataType != dt => - Alias(Cast(e, dt), e.name)() - case (e, _) => e + // Tracks the list of data types to widen. + // Some(dataType) means the right-hand side and the left-hand side have different types, + // and there is a target type to widen both sides to. + val targetTypes: Seq[Option[DataType]] = left.output.zip(right.output).map { + case (lhs, rhs) if lhs.dataType != rhs.dataType => + findWiderTypeForTwo(lhs.dataType, rhs.dataType) + case other => None } - Project(casted, plan) - } - if (castedTypes.exists(_.isDefined)) { - (castOutput(left), castOutput(right)) - } else { - (left, right) - } + if (targetTypes.exists(_.isDefined)) { + // There is at least one column to widen. + s.makeCopy(Array(widenTypes(left, targetTypes), widenTypes(right, targetTypes))) + } else { + // If we cannot find any column to widen, then just return the original set. + s + } } - def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case p if p.analyzed => p - - case s @ SetOperation(left, right) if s.childrenResolved - && left.output.length == right.output.length && !s.resolved => - val (newLeft, newRight) = widenOutputTypes(s.nodeName, left, right) - s.makeCopy(Array(newLeft, newRight)) + /** Given a plan, add an extra project on top to widen some columns' data types. */ + private def widenTypes(plan: LogicalPlan, targetTypes: Seq[Option[DataType]]): LogicalPlan = { + val casted = plan.output.zip(targetTypes).map { + case (e, Some(dt)) if e.dataType != dt => Alias(Cast(e, dt), e.name)() + case (e, _) => e + } + Project(casted, plan) } } @@ -372,8 +384,6 @@ object HiveTypeCoercion { * - INT gets turned into DECIMAL(10, 0) * - LONG gets turned into DECIMAL(20, 0) * - FLOAT and DOUBLE cause fixed-length decimals to turn into DOUBLE - * - * Note: Union/Except/Interact is handled by WidenTypes */ // scalastyle:on object DecimalPrecision extends Rule[LogicalPlan] { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala index b1f6c0b802d8e..b326aa9c55992 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/HiveTypeCoercionSuite.scala @@ -387,7 +387,7 @@ class HiveTypeCoercionSuite extends PlanTest { ) } - test("WidenSetOperationTypes for union except and intersect") { + test("WidenSetOperationTypes for union, except, and intersect") { def checkOutput(logical: LogicalPlan, expectTypes: Seq[DataType]): Unit = { logical.output.zip(expectTypes).foreach { case (attr, dt) => assert(attr.dataType === dt) @@ -499,7 +499,6 @@ class HiveTypeCoercionSuite extends PlanTest { ruleTest(dateTimeOperations, Subtract(interval, interval), Subtract(interval, interval)) } - /** * There are rules that need to not fire before child expressions get resolved. * We use this test to make sure those rules do not fire early. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala index dfe33ba8b0502..af76ff91a267c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveUDFSuite.scala @@ -22,7 +22,7 @@ import java.util.{ArrayList, Arrays, Properties} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.udf.UDAFPercentile -import org.apache.hadoop.hive.ql.udf.generic.{GenericUDAFAverage, GenericUDF, GenericUDFOPAnd, GenericUDTFExplode} +import org.apache.hadoop.hive.ql.udf.generic._ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject import org.apache.hadoop.hive.serde2.{AbstractSerDe, SerDeStats} import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, ObjectInspectorFactory} @@ -351,10 +351,14 @@ class HiveUDFSuite extends QueryTest with TestHiveSingleton with SQLTestUtils { } test("Hive UDF in group by") { - Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1") - val count = sql("select date(cast(test_date as timestamp))" + - " from tab1 group by date(cast(test_date as timestamp))").count() - assert(count == 1) + withTempTable("tab1") { + Seq(Tuple1(1451400761)).toDF("test_date").registerTempTable("tab1") + sql(s"CREATE TEMPORARY FUNCTION testUDFToDate AS '${classOf[GenericUDFToDate].getName}'") + val count = sql("select testUDFToDate(cast(test_date as timestamp))" + + " from tab1 group by testUDFToDate(cast(test_date as timestamp))").count() + sql("DROP TEMPORARY FUNCTION IF EXISTS testUDFToDate") + assert(count == 1) + } } test("SPARK-11522 select input_file_name from non-parquet table"){ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala index faa5aca1d8f7a..e22e320b17126 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceivedBlockHandler.scala @@ -71,7 +71,7 @@ private[streaming] class BlockManagerBasedBlockHandler( var numRecords: Option[Long] = None - val putResult: Seq[(BlockId, BlockStatus)] = block match { + val putSucceeded: Boolean = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, @@ -88,7 +88,7 @@ private[streaming] class BlockManagerBasedBlockHandler( throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } @@ -184,9 +184,9 @@ private[streaming] class WriteAheadLogBasedBlockHandler( // Store the block in block manager val storeInBlockManagerFuture = Future { - val putResult = + val putSucceeded = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) - if (!putResult.map { _._1 }.contains(blockId)) { + if (!putSucceeded) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") }