From 07ad91b02ad2e644788a7e432472e8c5384a29c6 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 13:17:49 +0800 Subject: [PATCH 01/13] add exterlnal sorter for takeOrdered function --- .../main/scala/org/apache/spark/rdd/RDD.scala | 30 +++++++++---------- .../apache/spark/util/collection/Utils.scala | 23 +++++++++----- .../apache/spark/sql/execution/limit.scala | 7 +++-- 3 files changed, 35 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 10b5f8291a03a..5e8b4aefdeb52 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -31,6 +31,7 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ +import org.apache.spark.serializer.Serializer import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.api.java.JavaRDD @@ -493,8 +494,7 @@ abstract class RDD[T: ClassTag]( * * @param weights weights for splits, will be normalized if they don't sum to 1 * @param seed random seed - * - * @return split RDDs in an array + * @return split RDDs in an array */ def randomSplit( weights: Array[Double], @@ -517,7 +517,8 @@ abstract class RDD[T: ClassTag]( /** * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability * range. - * @param lb lower bound to use for the Bernoulli sampler + * + * @param lb lower bound to use for the Bernoulli sampler * @param ub upper bound to use for the Bernoulli sampler * @param seed the seed for the Random number generator * @return A random sub-sample of the RDD without replacement. @@ -535,8 +536,7 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * - * @param withReplacement whether sampling is done with replacement + * @param withReplacement whether sampling is done with replacement * @param num size of the returned sample * @param seed seed for the random number generator * @return sample of specified size in an array @@ -1291,8 +1291,7 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * - * @note due to complications in the internal implementation, this method will raise + * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { @@ -1356,8 +1355,7 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * - * @param num k, the number of top elements to return + * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */ @@ -1379,19 +1377,19 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * - * @param num k, the number of elements to return + * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements */ - def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { + def takeOrdered(num: Int, ser: Serializer = SparkEnv.get.serializer) + (implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered(items, num)(ord) + queue ++= util.collection.Utils.takeOrdered[T](items, num, ser)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { @@ -1407,7 +1405,8 @@ abstract class RDD[T: ClassTag]( /** * Returns the max of this RDD as defined by the implicit Ordering[T]. - * @return the maximum element of the RDD + * + * @return the maximum element of the RDD * */ def max()(implicit ord: Ordering[T]): T = withScope { this.reduce(ord.max) @@ -1415,7 +1414,8 @@ abstract class RDD[T: ClassTag]( /** * Returns the min of this RDD as defined by the implicit Ordering[T]. - * @return the minimum element of the RDD + * + * @return the minimum element of the RDD * */ def min()(implicit ord: Ordering[T]): T = withScope { this.reduce(ord.min) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 4939b600dbfbd..14db03eeff4d3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -17,9 +17,11 @@ package org.apache.spark.util.collection -import scala.collection.JavaConverters._ +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.serializer.Serializer +import org.apache.spark.InternalAccumulator +import org.apache.spark.util.CompletionIterator -import com.google.common.collect.{Ordering => GuavaOrdering} /** * Utility functions for collections. @@ -30,10 +32,17 @@ private[spark] object Utils { * Returns the first K elements from the input as defined by the specified implicit Ordering[T] * and maintains the ordering. */ - def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = { - val ordering = new GuavaOrdering[T] { - override def compare(l: T, r: T): Int = ord.compare(l, r) - } - ordering.leastOf(input.asJava, num).iterator.asScala + def takeOrdered[T](input: Iterator[T], num: Int, + ser: Serializer = SparkEnv.get.serializer)(implicit ord: Ordering[T]): Iterator[T] = { + val context = TaskContext.get() + val sorter = + new ExternalSorter[T, Any, Any](context, None, None, Some(ord), Some(ser)) + sorter.insertAll(input.map(x => (x, null))) + context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) + context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) + context.taskMetrics().incSpillTime(sorter.spillTime) + context.internalMetricsToAccumulators( + InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes) + CompletionIterator[T, Iterator[T]](sorter.iterator.map(_._1).take(num), sorter.stop()) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 781c016095427..28f332238c3b3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.util import org.apache.spark.util.Utils @@ -125,7 +126,7 @@ case class TakeOrderedAndProjectExec( override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) - val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) + val data = child.execute().map(_.copy()).takeOrdered(limit, serializer)(ord) if (projectList.isDefined) { val proj = UnsafeProjection.create(projectList.get, child.output) data.map(r => proj(r).copy()) @@ -140,14 +141,14 @@ case class TakeOrderedAndProjectExec( val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { child.execute().map(_.copy()).mapPartitions { iter => - org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord) + util.collection.Utils.takeOrdered(iter, limit, serializer)(ord) } } val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( localTopK, child.output, SinglePartition, serializer)) shuffled.mapPartitions { iter => - val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) + val topK = util.collection.Utils.takeOrdered(iter.map(_.copy()), limit, serializer)(ord) if (projectList.isDefined) { val proj = UnsafeProjection.create(projectList.get, child.output) topK.map(r => proj(r)) From 8c483ad4d219114e3ad84705be3d07760a5f7059 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 13:42:12 +0800 Subject: [PATCH 02/13] fix compile errors --- .../scala/org/apache/spark/util/collection/Utils.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 14db03eeff4d3..c0b627ebe068e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -19,7 +19,6 @@ package org.apache.spark.util.collection import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.serializer.Serializer -import org.apache.spark.InternalAccumulator import org.apache.spark.util.CompletionIterator @@ -36,13 +35,11 @@ private[spark] object Utils { ser: Serializer = SparkEnv.get.serializer)(implicit ord: Ordering[T]): Iterator[T] = { val context = TaskContext.get() val sorter = - new ExternalSorter[T, Any, Any](context, None, None, Some(ord), Some(ser)) + new ExternalSorter[T, Any, Any](context, None, None, Some(ord), ser) sorter.insertAll(input.map(x => (x, null))) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) - context.taskMetrics().incSpillTime(sorter.spillTime) - context.internalMetricsToAccumulators( - InternalAccumulator.PEAK_EXECUTION_MEMORY).add(sorter.peakMemoryUsedBytes) + context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) CompletionIterator[T, Iterator[T]](sorter.iterator.map(_._1).take(num), sorter.stop()) } } From 40d639562661b863cb12d3a1840b885815de7998 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 14:11:34 +0800 Subject: [PATCH 03/13] fix compile errors --- project/MimaExcludes.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 16f26e7d283b4..c372c9e2538a7 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -65,6 +65,8 @@ object MimaExcludes { "org.apache.spark.status.api.v1.ApplicationAttemptInfo.$default$5"), // SPARK-14042 Add custom coalescer support ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.coalesce"), + // SPARK-17488 Add external sorter for takeOrdered + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.takeOrdered"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.rdd.PartitionCoalescer$LocationIterator"), ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.rdd.PartitionCoalescer"), // SPARK-15532 Remove isRootContext flag from SQLContext. From 07a8f240ca2c0c46df9fc6cae99c24e0bb21e55c Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Sat, 10 Sep 2016 16:20:59 +0800 Subject: [PATCH 04/13] 1.remove unnecessary changes 2.handle the case when TaskContext is null --- .../main/scala/org/apache/spark/rdd/RDD.scala | 24 +++++++++-------- .../apache/spark/util/collection/Utils.scala | 26 +++++++++++++------ 2 files changed, 31 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 5e8b4aefdeb52..f8d5dce7c0fe1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -494,7 +494,8 @@ abstract class RDD[T: ClassTag]( * * @param weights weights for splits, will be normalized if they don't sum to 1 * @param seed random seed - * @return split RDDs in an array + * + * @return split RDDs in an array */ def randomSplit( weights: Array[Double], @@ -517,8 +518,7 @@ abstract class RDD[T: ClassTag]( /** * Internal method exposed for Random Splits in DataFrames. Samples an RDD given a probability * range. - * - * @param lb lower bound to use for the Bernoulli sampler + * @param lb lower bound to use for the Bernoulli sampler * @param ub upper bound to use for the Bernoulli sampler * @param seed the seed for the Random number generator * @return A random sub-sample of the RDD without replacement. @@ -536,7 +536,8 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * @param withReplacement whether sampling is done with replacement + * + * @param withReplacement whether sampling is done with replacement * @param num size of the returned sample * @param seed seed for the random number generator * @return sample of specified size in an array @@ -1291,7 +1292,8 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * @note due to complications in the internal implementation, this method will raise + * + * @note due to complications in the internal implementation, this method will raise * an exception if called on an RDD of `Nothing` or `Null`. */ def take(num: Int): Array[T] = withScope { @@ -1355,7 +1357,8 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * @param num k, the number of top elements to return + * + * @param num k, the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */ @@ -1377,7 +1380,8 @@ abstract class RDD[T: ClassTag]( * * @note this method should only be used if the resulting array is expected to be small, as * all the data is loaded into the driver's memory. - * @param num k, the number of elements to return + * + * @param num k, the number of elements to return * @param ord the implicit ordering for T * @return an array of top elements */ @@ -1405,8 +1409,7 @@ abstract class RDD[T: ClassTag]( /** * Returns the max of this RDD as defined by the implicit Ordering[T]. - * - * @return the maximum element of the RDD + * @return the maximum element of the RDD * */ def max()(implicit ord: Ordering[T]): T = withScope { this.reduce(ord.max) @@ -1414,8 +1417,7 @@ abstract class RDD[T: ClassTag]( /** * Returns the min of this RDD as defined by the implicit Ordering[T]. - * - * @return the minimum element of the RDD + * @return the minimum element of the RDD * */ def min()(implicit ord: Ordering[T]): T = withScope { this.reduce(ord.min) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index c0b627ebe068e..2a5607cafe764 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -17,11 +17,14 @@ package org.apache.spark.util.collection +import scala.collection.JavaConverters._ + +import com.google.common.collect.{Ordering => GuavaOrdering} + import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.serializer.Serializer import org.apache.spark.util.CompletionIterator - /** * Utility functions for collections. */ @@ -34,12 +37,19 @@ private[spark] object Utils { def takeOrdered[T](input: Iterator[T], num: Int, ser: Serializer = SparkEnv.get.serializer)(implicit ord: Ordering[T]): Iterator[T] = { val context = TaskContext.get() - val sorter = - new ExternalSorter[T, Any, Any](context, None, None, Some(ord), ser) - sorter.insertAll(input.map(x => (x, null))) - context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) - context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) - context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) - CompletionIterator[T, Iterator[T]](sorter.iterator.map(_._1).take(num), sorter.stop()) + if (context == null) { + val ordering = new GuavaOrdering[T] { + override def compare(l: T, r: T): Int = ord.compare(l, r) + } + ordering.leastOf(input.asJava, num).iterator.asScala + } else { + val sorter = + new ExternalSorter[T, Any, Any](context, None, None, Some(ord), ser) + sorter.insertAll(input.map(x => (x, null))) + context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) + context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) + context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + CompletionIterator[T, Iterator[T]](sorter.iterator.map(_._1).take(num), sorter.stop()) + } } } From 44fc619235e3d6b682f1bedd6dd79cdf3dc5bdc3 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 15 Sep 2016 18:45:42 +0800 Subject: [PATCH 05/13] Merge remote-tracking branch 'remotes/apache/master' into SPARK-17488 --- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 245b16564cca7..cca83b109a95d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -127,7 +127,7 @@ case class TakeOrderedAndProjectExec( override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) - val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) + val data = child.execute().map(_.copy()).takeOrdered(limit, serializer)(ord) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) data.map(r => proj(r).copy()) @@ -142,14 +142,14 @@ case class TakeOrderedAndProjectExec( val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { child.execute().map(_.copy()).mapPartitions { iter => - org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord) + util.collection.Utils.takeOrdered(iter, limit, serializer)(ord) } } val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( localTopK, child.output, SinglePartition, serializer)) shuffled.mapPartitions { iter => - val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord) + val topK = util.collection.Utils.takeOrdered(iter.map(_.copy()), limit, serializer)(ord) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) topK.map(r => proj(r)) From 079f6b86d0ce0adb41ae1f61dcfe1ba43043f66d Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 22 Sep 2016 19:28:23 +0800 Subject: [PATCH 06/13] 1.revert changes in RDD 2.judge when to use external sorter --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 6 +-- .../apache/spark/util/collection/Utils.scala | 47 ++++++++++++++----- project/MimaExcludes.scala | 2 - .../datasources/DataSourceStrategy.scala | 10 ++-- .../apache/spark/sql/execution/limit.scala | 4 +- .../org/apache/spark/sql/DataFrameSuite.scala | 14 +++++- 7 files changed, 61 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1981ad5671093..ec02147e933d5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -484,7 +484,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // The Mesos scheduler backend relies on this environment variable to set executor memory. // TODO: Set this only in the Mesos scheduler. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" - executorEnvs ++= _conf.getExecutorEnv + executorEnvs ++= _conf.getExecutorEnvj executorEnvs("SPARK_USER") = sparkUser // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index f5c9783b46032..6dc334ceb52ea 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -31,7 +31,6 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.TextOutputFormat import org.apache.spark._ -import org.apache.spark.serializer.Serializer import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.api.java.JavaRDD @@ -1385,15 +1384,14 @@ abstract class RDD[T: ClassTag]( * @param ord the implicit ordering for T * @return an array of top elements */ - def takeOrdered(num: Int, ser: Serializer = SparkEnv.get.serializer) - (implicit ord: Ordering[T]): Array[T] = withScope { + def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope { if (num == 0) { Array.empty } else { val mapRDDs = mapPartitions { items => // Priority keeps the largest elements, so let's reverse the ordering. val queue = new BoundedPriorityQueue[T](num)(ord.reverse) - queue ++= util.collection.Utils.takeOrdered[T](items, num, ser)(ord) + queue ++= util.collection.Utils.takeOrdered(items, num)(ord) Iterator.single(queue) } if (mapRDDs.partitions.length == 0) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 2a5607cafe764..9cca219ec394e 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -23,7 +23,7 @@ import com.google.common.collect.{Ordering => GuavaOrdering} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.serializer.Serializer -import org.apache.spark.util.CompletionIterator +import org.apache.spark.util.{CompletionIterator, SizeEstimator} /** * Utility functions for collections. @@ -34,18 +34,43 @@ private[spark] object Utils { * Returns the first K elements from the input as defined by the specified implicit Ordering[T] * and maintains the ordering. */ - def takeOrdered[T](input: Iterator[T], num: Int, - ser: Serializer = SparkEnv.get.serializer)(implicit ord: Ordering[T]): Iterator[T] = { + def takeOrdered[T](input: Iterator[T], num: Int)(implicit ord: Ordering[T]): Iterator[T] = { + val ordering = new GuavaOrdering[T] { + override def compare(l: T, r: T): Int = ord.compare(l, r) + } + ordering.leastOf(input.asJava, num).iterator.asScala + } + + /** + * Returns the first K elements from the input as defined by the specified implicit Ordering[T] + * and maintains the ordering. + */ + def takeOrdered[T](input: Iterator[T], num: Int, ser: Serializer) + (implicit ord: Ordering[T]): Iterator[T] = { val context = TaskContext.get() - if (context == null) { - val ordering = new GuavaOrdering[T] { - override def compare(l: T, r: T): Int = ord.compare(l, r) - } - ordering.leastOf(input.asJava, num).iterator.asScala + if (context == null || !input.hasNext) { + return takeOrdered(input, num)(ord) + } + + val iter = input.buffered + var size = SizeEstimator.estimate(iter.head) + if (size == 0) { + size = 1024 + } + + val executorMemory = SparkEnv.get.conf.getOption("spark.executor.memory") + .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) + .orElse(Option(System.getenv("SPARK_MEM"))) + .map(Utils.memoryStringToMb) + .getOrElse(1024L) * 1024 * 1024 + + val limit = (executorMemory / size) * 0.1 + + if (num < limit) { + takeOrdered(iter, num)(ord) } else { - val sorter = - new ExternalSorter[T, Any, Any](context, None, None, Some(ord), ser) - sorter.insertAll(input.map(x => (x, null))) + val sorter = new ExternalSorter[T, Any, Any](context, None, None, Some(ord), ser) + sorter.insertAll(iter.map(x => (x, null))) context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9a018655c2a7b..f13f3ff789484 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -70,8 +70,6 @@ object MimaExcludes { "org.apache.spark.status.api.v1.ApplicationAttemptInfo.$default$5"), // SPARK-14042 Add custom coalescer support ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.coalesce"), - // SPARK-17488 Add external sorter for takeOrdered - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.rdd.RDD.takeOrdered"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.rdd.PartitionCoalescer$LocationIterator"), ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.rdd.PartitionCoalescer"), // SPARK-15532 Remove isRootContext flag from SQLContext. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index c8ad5b303491f..63f01c5bb9e3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -197,7 +197,10 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { * source information. */ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] { - private def readDataSourceTable(sparkSession: SparkSession, table: CatalogTable): LogicalPlan = { + private def readDataSourceTable( + sparkSession: SparkSession, + simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = { + val table = simpleCatalogRelation.catalogTable val dataSource = DataSource( sparkSession, @@ -209,16 +212,17 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] LogicalRelation( dataSource.resolveRelation(), + expectedOutputAttributes = Some(simpleCatalogRelation.output), catalogTable = Some(table)) } override def apply(plan: LogicalPlan): LogicalPlan = plan transform { case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _) if DDLUtils.isDatasourceTable(s.metadata) => - i.copy(table = readDataSourceTable(sparkSession, s.metadata)) + i.copy(table = readDataSourceTable(sparkSession, s)) case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) => - readDataSourceTable(sparkSession, s.metadata) + readDataSourceTable(sparkSession, s) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index cca83b109a95d..9a7a77c1e832a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCo import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange import org.apache.spark.util -import org.apache.spark.util.Utils +import org.apache.spark.util.{SizeEstimator, Utils} /** @@ -127,7 +127,7 @@ case class TakeOrderedAndProjectExec( override def executeCollect(): Array[InternalRow] = { val ord = new LazilyGeneratedOrdering(sortOrder, child.output) - val data = child.execute().map(_.copy()).takeOrdered(limit, serializer)(ord) + val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) data.map(r => proj(r).copy()) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c2d256bdd335b..2c60a7dd9209b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -26,7 +26,8 @@ import scala.util.Random import org.scalatest.Matchers._ import org.apache.spark.SparkException -import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Union} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project, Union} import org.apache.spark.sql.execution.QueryExecution import org.apache.spark.sql.execution.aggregate.HashAggregateExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchange} @@ -1585,4 +1586,15 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val d = sampleDf.withColumn("c", monotonically_increasing_id).select($"c").collect assert(d.size == d.distinct.size) } + + test("SPARK-17625: data source table in InMemoryCatalog should guarantee output consistency") { + val tableName = "tbl" + withTable(tableName) { + spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName) + val relation = spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)) + val expr = relation.resolve("i") + val qe = spark.sessionState.executePlan(Project(Seq(expr), relation)) + qe.assertAnalyzed() + } + } } From dd82d5eae0da848dbd9680018547420f338a85d8 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 22 Sep 2016 20:12:03 +0800 Subject: [PATCH 07/13] 1.Merge latest code 2.clean code --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../scala/org/apache/spark/util/collection/Utils.scala | 9 ++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ec02147e933d5..1981ad5671093 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -484,7 +484,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // The Mesos scheduler backend relies on this environment variable to set executor memory. // TODO: Set this only in the Mesos scheduler. executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" - executorEnvs ++= _conf.getExecutorEnvj + executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 9cca219ec394e..23f3aee3b4c1d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -21,7 +21,7 @@ import scala.collection.JavaConverters._ import com.google.common.collect.{Ordering => GuavaOrdering} -import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.{TaskContext} import org.apache.spark.serializer.Serializer import org.apache.spark.util.{CompletionIterator, SizeEstimator} @@ -58,12 +58,7 @@ private[spark] object Utils { size = 1024 } - val executorMemory = SparkEnv.get.conf.getOption("spark.executor.memory") - .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) - .orElse(Option(System.getenv("SPARK_MEM"))) - .map(Utils.memoryStringToMb) - .getOrElse(1024L) * 1024 * 1024 - + val executorMemory = Runtime.getRuntime.maxMemory() val limit = (executorMemory / size) * 0.1 if (num < limit) { From d1d0afe47a400b43b951fbb9d3c147ed7bb7f02e Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Thu, 22 Sep 2016 21:11:32 +0800 Subject: [PATCH 08/13] change scala style --- .../main/scala/org/apache/spark/util/collection/Utils.scala | 3 ++- .../main/scala/org/apache/spark/sql/execution/limit.scala | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 23f3aee3b4c1d..4de48e278cc24 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import scala.collection.JavaConverters._ +import scala.reflect.ClassTag import com.google.common.collect.{Ordering => GuavaOrdering} @@ -45,7 +46,7 @@ private[spark] object Utils { * Returns the first K elements from the input as defined by the specified implicit Ordering[T] * and maintains the ordering. */ - def takeOrdered[T](input: Iterator[T], num: Int, ser: Serializer) + def takeOrdered[T: ClassTag](input: Iterator[T], num: Int, ser: Serializer) (implicit ord: Ordering[T]): Iterator[T] = { val context = TaskContext.get() if (context == null || !input.hasNext) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 9a7a77c1e832a..7fa10d3b226e8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -142,14 +142,15 @@ case class TakeOrderedAndProjectExec( val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { child.execute().map(_.copy()).mapPartitions { iter => - util.collection.Utils.takeOrdered(iter, limit, serializer)(ord) + util.collection.Utils.takeOrdered(iter, limit, serializer)(classTag[InternalRow], ord) } } val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( localTopK, child.output, SinglePartition, serializer)) shuffled.mapPartitions { iter => - val topK = util.collection.Utils.takeOrdered(iter.map(_.copy()), limit, serializer)(ord) + val topK = util.collection.Utils.takeOrdered(iter.map(_.copy()), + limit, serializer)(classTag[InternalRow], ord) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) topK.map(r => proj(r)) From 71f2e4a8d475dbb30a7e7262df2058d79889bb1c Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 23 Sep 2016 02:31:21 +0800 Subject: [PATCH 09/13] estimate T's size --- .../main/scala/org/apache/spark/util/SizeEstimator.scala | 8 +++++++- .../scala/org/apache/spark/util/collection/Utils.scala | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index 386fdfd218a88..c057680d2a7b5 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -66,7 +66,13 @@ object SizeEstimator extends Logging { * deserialized form. This is not the same as the serialized size of the object, which will * typically be much smaller. */ - def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef]) + def estimate(obj: Any): Long = { + if (obj.isInstanceOf[AnyVal]) { + primitiveSize(obj.getClass) + } else { + estimate(obj.asInstanceOf[AnyRef], new IdentityHashMap[AnyRef, AnyRef]) + } + } // Sizes of primitive types private val BYTE_SIZE = 1 diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 4de48e278cc24..1955dd184f7f0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -46,7 +46,7 @@ private[spark] object Utils { * Returns the first K elements from the input as defined by the specified implicit Ordering[T] * and maintains the ordering. */ - def takeOrdered[T: ClassTag](input: Iterator[T], num: Int, ser: Serializer) + def takeOrdered[T](input: Iterator[T], num: Int, ser: Serializer) (implicit ord: Ordering[T]): Iterator[T] = { val context = TaskContext.get() if (context == null || !input.hasNext) { From 64d7f22768e5ac746a9039ded399635dd8991861 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 23 Sep 2016 03:08:30 +0800 Subject: [PATCH 10/13] estimate T's size --- .../org/apache/spark/util/SizeEstimator.scala | 10 ++-------- .../org/apache/spark/util/collection/Utils.scala | 15 ++++++++++++--- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index c057680d2a7b5..bfaf1d80f9876 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -66,13 +66,7 @@ object SizeEstimator extends Logging { * deserialized form. This is not the same as the serialized size of the object, which will * typically be much smaller. */ - def estimate(obj: Any): Long = { - if (obj.isInstanceOf[AnyVal]) { - primitiveSize(obj.getClass) - } else { - estimate(obj.asInstanceOf[AnyRef], new IdentityHashMap[AnyRef, AnyRef]) - } - } + def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef]) // Sizes of primitive types private val BYTE_SIZE = 1 @@ -294,7 +288,7 @@ object SizeEstimator extends Logging { size } - private def primitiveSize(cls: Class[_]): Int = { + def primitiveSize(cls: Class[_]): Int = { if (cls == classOf[Byte]) { BYTE_SIZE } else if (cls == classOf[Boolean]) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 1955dd184f7f0..b98905264fed3 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -26,6 +26,8 @@ import org.apache.spark.{TaskContext} import org.apache.spark.serializer.Serializer import org.apache.spark.util.{CompletionIterator, SizeEstimator} +import scala.runtime.ScalaRunTime + /** * Utility functions for collections. */ @@ -54,13 +56,20 @@ private[spark] object Utils { } val iter = input.buffered - var size = SizeEstimator.estimate(iter.head) + val head = iter.head + var size = + if (ScalaRunTime.isAnyVal(head)) { + SizeEstimator.primitiveSize(head) + } else { + SizeEstimator.estimate(head) + } + if (size == 0) { size = 1024 } - val executorMemory = Runtime.getRuntime.maxMemory() - val limit = (executorMemory / size) * 0.1 + val memory = Runtime.getRuntime.maxMemory() + val limit = (memory / size) * 0.1 if (num < limit) { takeOrdered(iter, num)(ord) From 09c68d6afc5b926a9a74178df52de07a7b1e6018 Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 23 Sep 2016 03:15:09 +0800 Subject: [PATCH 11/13] estimate T's size --- .../main/scala/org/apache/spark/util/collection/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index b98905264fed3..32b8b1c768ccb 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -18,7 +18,7 @@ package org.apache.spark.util.collection import scala.collection.JavaConverters._ -import scala.reflect.ClassTag +import scala.runtime.ScalaRunTime import com.google.common.collect.{Ordering => GuavaOrdering} @@ -26,7 +26,7 @@ import org.apache.spark.{TaskContext} import org.apache.spark.serializer.Serializer import org.apache.spark.util.{CompletionIterator, SizeEstimator} -import scala.runtime.ScalaRunTime + /** * Utility functions for collections. From 09e0740bead1a4b2fd888abdbbdfdd404dff1ead Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 23 Sep 2016 10:30:01 +0800 Subject: [PATCH 12/13] estimate T's size --- .../main/scala/org/apache/spark/util/collection/Utils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index 32b8b1c768ccb..2e048cb27badf 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -59,9 +59,9 @@ private[spark] object Utils { val head = iter.head var size = if (ScalaRunTime.isAnyVal(head)) { - SizeEstimator.primitiveSize(head) + SizeEstimator.primitiveSize(head.getClass) } else { - SizeEstimator.estimate(head) + SizeEstimator.estimate(head.asInstanceOf[AnyRef]) } if (size == 0) { From 4b29ded0e678a50c53a38bcac5d0b6906141558e Mon Sep 17 00:00:00 2001 From: cenyuhai Date: Fri, 23 Sep 2016 13:54:06 +0800 Subject: [PATCH 13/13] remove classTag --- .../main/scala/org/apache/spark/sql/execution/limit.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 7fa10d3b226e8..9a7a77c1e832a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -142,15 +142,14 @@ case class TakeOrderedAndProjectExec( val ord = new LazilyGeneratedOrdering(sortOrder, child.output) val localTopK: RDD[InternalRow] = { child.execute().map(_.copy()).mapPartitions { iter => - util.collection.Utils.takeOrdered(iter, limit, serializer)(classTag[InternalRow], ord) + util.collection.Utils.takeOrdered(iter, limit, serializer)(ord) } } val shuffled = new ShuffledRowRDD( ShuffleExchange.prepareShuffleDependency( localTopK, child.output, SinglePartition, serializer)) shuffled.mapPartitions { iter => - val topK = util.collection.Utils.takeOrdered(iter.map(_.copy()), - limit, serializer)(classTag[InternalRow], ord) + val topK = util.collection.Utils.takeOrdered(iter.map(_.copy()), limit, serializer)(ord) if (projectList != child.output) { val proj = UnsafeProjection.create(projectList, child.output) topK.map(r => proj(r))