From a7fbc74335c2df27002e8158f8e83a919195eed7 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Mon, 6 Aug 2018 11:04:31 -0700 Subject: [PATCH 1/7] [SPARK-21436] Take advantage of known partioner for distinct on RDDs to avoid a shuffle. Special case the situation where we know the partioner and the number of requested partions output is the same as the current partioner to avoid a shuffle and instead compute distinct inside of each partion. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 11 ++++++++++- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0574abdca32ac..471b9e0a1a877 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -396,7 +396,16 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { - map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + // If the data is already approriately partioned with a known partioner we can work locally. + def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { + val set = new mutable.HashSet[T]() ++= itr + set.toIterator + } + partitioner match { + case Some(p) if numPartitions == partitions.length => + mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) + case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + } } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index b143a468a1baf..3001a2b005d8b 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -95,6 +95,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } + test("distinct with known partioner does not cause shuffle") { + val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() + val initialPartioner = rdd.partitioner + val distinctRdd = rdd.distinct() + val resultingPartioner = distinctRdd.partitioner + assert(initialPartioner === resultingPartioner) + val distinctRddDifferent = rdd.distinct(5) + val distinctRddDifferentPartioner = distinctRddDifferent.partitioner + assert(initialPartioner != distinctRddDifferentPartioner) + assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) + } + test("countApproxDistinct") { def error(est: Long, size: Long): Double = math.abs(est - size) / size.toDouble From 5fd36592a26b07fdb58e79e4efbb6b70daea54df Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Fri, 10 Aug 2018 11:10:31 -0700 Subject: [PATCH 2/7] CR feedback, reduce # of passes over data from 2 to 1 and fix some spelling issues. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 +++--- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 12 ++++++------ 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 471b9e0a1a877..d9eff9f9b0ac1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -396,10 +396,10 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { - // If the data is already approriately partioned with a known partioner we can work locally. + // If the data is already approriately partitioned with a known partitioner we can work locally. def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { - val set = new mutable.HashSet[T]() ++= itr - set.toIterator + val set = new mutable.HashSet[T]() + itr.filter(set.add(_)) } partitioner match { case Some(p) if numPartitions == partitions.length => diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 3001a2b005d8b..ca1d399a702e8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -95,15 +95,15 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { assert(!deserial.toString().isEmpty()) } - test("distinct with known partioner does not cause shuffle") { + test("distinct with known partitioner does not cause shuffle") { val rdd = sc.parallelize(1.to(100), 10).map(x => (x % 10, x % 10)).sortByKey() - val initialPartioner = rdd.partitioner + val initialPartitioner = rdd.partitioner val distinctRdd = rdd.distinct() - val resultingPartioner = distinctRdd.partitioner - assert(initialPartioner === resultingPartioner) + val resultingPartitioner = distinctRdd.partitioner + assert(initialPartitioner === resultingPartitioner) val distinctRddDifferent = rdd.distinct(5) - val distinctRddDifferentPartioner = distinctRddDifferent.partitioner - assert(initialPartioner != distinctRddDifferentPartioner) + val distinctRddDifferentPartitioner = distinctRddDifferent.partitioner + assert(initialPartitioner != distinctRddDifferentPartitioner) assert(distinctRdd.collect().sorted === distinctRddDifferent.collect().sorted) } From e96b7907c10e2fe3915243258507bf44f5c7b990 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 6 Sep 2018 10:04:27 -0700 Subject: [PATCH 3/7] Add a partitioner which wraps a previous partitioner and calls it with just the key of a k/v tuple --- core/src/main/scala/org/apache/spark/Partitioner.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index c940cb25d478b..5c1df1921b60d 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -347,3 +347,11 @@ private[spark] object RangePartitioner { bounds.toArray } } + +// Takes a partitioner on K and uses the same on type partitioner on type K, null +private[spark] class WrappedPartitioner(original: Partitioner) extends Partitioner { + def numPartitions: Int = original.numPartitions + def getPartition(key: Any): Int = { + original.getPartition(key.asInstanceOf[Tuple2[Any, Any]]._1) + } +} From bb28c4b6f60b809034e46a67d8fd6e98cab923c8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 6 Sep 2018 10:05:41 -0700 Subject: [PATCH 4/7] Add a param to MapPartitionsRDD to supply a knownPartitioner if we know what partitioner will be the result of our function. --- .../apache/spark/rdd/MapPartitionsRDD.scala | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index e4587c96eae1c..9762269e8eda1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -19,18 +19,31 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Partition, Partitioner, TaskContext} /** * An RDD that applies the provided function to every partition of the parent RDD. + * + * @param prev The RDD being mapped over. + * @param f The function applied to each partition + * @param preservesPartitioning If the function changes does not change the keys or + * does so in such a way the previous partitioner is still valid. + * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) - preservesPartitioning: Boolean = false) + preservesPartitioning: Boolean = false, + knownPartitioner: Option[Partitioner] = None) extends RDD[U](prev) { - override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None + override val partitioner = { + if (preservesPartitioning) { + firstParent[T].partitioner + } else { + knownPartitioner + } + } override def getPartitions: Array[Partition] = firstParent[T].partitions From 7ed7589bcba9273aa14ba207bfaf5bb67b57e6c8 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Thu, 6 Sep 2018 10:07:13 -0700 Subject: [PATCH 5/7] Use the wrapped partitioner and the ability to specify a given known partitioner to MapPartitionsRDD to compute the distinct of an RDD without causing a shuffle when we have a known partitioner. This is done with reduceByKey instead of the mutable.HashSet earlier so as to allow spilling to disk for large partitions. --- .../main/scala/org/apache/spark/rdd/RDD.scala | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d9eff9f9b0ac1..647a413e5acad 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -396,14 +396,24 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { - // If the data is already approriately partitioned with a known partitioner we can work locally. - def removeDuplicatesInPartition(itr: Iterator[T]): Iterator[T] = { - val set = new mutable.HashSet[T]() - itr.filter(set.add(_)) - } partitioner match { case Some(p) if numPartitions == partitions.length => - mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) + def key(x: T): (T, Null) = (x, null) + val cleanKey = sc.clean(key _) + val keyed = new MapPartitionsRDD[(T, Null), T]( + this, + (context, pid, iter) => iter.map(cleanKey), + knownPartitioner = Some(new WrappedPartitioner(p))) + val duplicatesRemoved = keyed.reduceByKey((x, y) => x) + + def deKey(x: (T, Null)): T = x._1 + val cleanDeKey = sc.clean(deKey _) + val deKeyed = new MapPartitionsRDD[T, (T, Null)]( + duplicatesRemoved, + (context, pid, iter) => iter.map(cleanDeKey), + knownPartitioner = Some(p)) + deKeyed + case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) } } From 849f67bf6c9a54007fec63a0b97cecfc7137e0be Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 25 Sep 2018 16:20:29 -0700 Subject: [PATCH 6/7] Instead of specializing MapParitions directly use ExternalAppendOnlyMap inside of our distinct function --- .../scala/org/apache/spark/Partitioner.scala | 8 ----- .../apache/spark/rdd/MapPartitionsRDD.scala | 12 ++------ .../main/scala/org/apache/spark/rdd/RDD.scala | 29 ++++++++----------- 3 files changed, 14 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index a147648d41ba0..515237558fd87 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -350,11 +350,3 @@ private[spark] object RangePartitioner { bounds.toArray } } - -// Takes a partitioner on K and uses the same on type partitioner on type K, null -private[spark] class WrappedPartitioner(original: Partitioner) extends Partitioner { - def numPartitions: Int = original.numPartitions - def getPartition(key: Any): Int = { - original.getPartition(key.asInstanceOf[Tuple2[Any, Any]]._1) - } -} diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index f90459c17924e..ab287945d6624 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -35,24 +35,16 @@ import org.apache.spark.{Partition, Partitioner, TaskContext} * @param isOrderSensitive whether or not the function is order-sensitive. If it's order * sensitive, it may return totally different result when the input order * is changed. Mostly stateful functions are order-sensitive. - * @param knownPartitioner If the result has a known partitioner. */ private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( var prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false, isFromBarrier: Boolean = false, - isOrderSensitive: Boolean = false, - knownPartitioner: Option[Partitioner] = None) + isOrderSensitive: Boolean = false) extends RDD[U](prev) { - override val partitioner = { - if (preservesPartitioning) { - firstParent[T].partitioner - } else { - knownPartitioner - } - } + override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 199548a60c199..743e3441eea55 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -42,7 +42,8 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} -import org.apache.spark.util.collection.{OpenHashMap, Utils => collectionUtils} +import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, + Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} @@ -396,24 +397,18 @@ abstract class RDD[T: ClassTag]( * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { + def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { + // Create an instance of external append only map which ignores values. + val map = new ExternalAppendOnlyMap[T, Null, Null]( + createCombiner = value => null, + mergeValue = (a, b) => a, + mergeCombiners = (a, b) => a) + map.insertAll(partition.map(_ -> null)) + map.iterator.map(_._1) + } partitioner match { case Some(p) if numPartitions == partitions.length => - def key(x: T): (T, Null) = (x, null) - val cleanKey = sc.clean(key _) - val keyed = new MapPartitionsRDD[(T, Null), T]( - this, - (context, pid, iter) => iter.map(cleanKey), - knownPartitioner = Some(new WrappedPartitioner(p))) - val duplicatesRemoved = keyed.reduceByKey((x, y) => x) - - def deKey(x: (T, Null)): T = x._1 - val cleanDeKey = sc.clean(deKey _) - val deKeyed = new MapPartitionsRDD[T, (T, Null)]( - duplicatesRemoved, - (context, pid, iter) => iter.map(cleanDeKey), - knownPartitioner = Some(p)) - deKeyed - + mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) case _ => map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) } } From 95357cff3da95c962c575f1b8efe155841ed78a5 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Wed, 26 Sep 2018 11:43:34 -0700 Subject: [PATCH 7/7] Remove undeed change to MapPartitionsRDD --- core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala index ab287945d6624..aa61997122cf4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.rdd import scala.reflect.ClassTag -import org.apache.spark.{Partition, Partitioner, TaskContext} +import org.apache.spark.{Partition, TaskContext} /** * An RDD that applies the provided function to every partition of the parent RDD.