From 9f5149fb1c110f01b3877245d46c98ba0831a56f Mon Sep 17 00:00:00 2001 From: Guillaume Poulin Date: Fri, 18 Dec 2015 11:09:08 -0500 Subject: [PATCH 1/2] DStream union optimisation Use PartitionerAwareUnionRDD when possbile for optimizing shuffling and preserving the partitioner. --- .../org/apache/spark/streaming/dstream/UnionDStream.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index c1846a31f6605..8a5b1f324a9a1 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.SparkException -import org.apache.spark.rdd.{RDD, UnionRDD} +import org.apache.spark.rdd.{RDD, UnionRDD, PartitionerAwareUnionRDD} import org.apache.spark.streaming.{Duration, Time} private[streaming] @@ -45,7 +45,11 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) s" time $validTime") } if (rdds.nonEmpty) { - Some(new UnionRDD(ssc.sc, rdds)) + if(rdds.forall(_.partitioner.isDefined) && rdds.flatMap(_.partitioner).toSet.size == 1) { + Some(new PartitionerAwareUnionRDD(ssc.sc, rdds)) + } else { + Some(new UnionRDD(ssc.sc, rdds)) + } } else { None } From 3bb5ea3007d89a58d9eb1925f3334c7700ab71e3 Mon Sep 17 00:00:00 2001 From: Guillaume Poulin Date: Tue, 22 Dec 2015 15:19:59 -0500 Subject: [PATCH 2/2] Simplify RDD union Deduplicate logic to determine if `PartitionerAwareUnionRDD` should be used instead of `UnionRDD`. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 6 +----- .../apache/spark/streaming/dstream/UnionDStream.scala | 8 ++------ .../spark/streaming/dstream/WindowedDStream.scala | 11 ++--------- 3 files changed, 5 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4a0a2199ef7e8..032939b49a708 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -568,11 +568,7 @@ abstract class RDD[T: ClassTag]( * times (use `.distinct()` to eliminate them). */ def union(other: RDD[T]): RDD[T] = withScope { - if (partitioner.isDefined && other.partitioner == partitioner) { - new PartitionerAwareUnionRDD(sc, Array(this, other)) - } else { - new UnionRDD(sc, Array(this, other)) - } + sc.union(this, other) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala index 8a5b1f324a9a1..d46c0a01e05d9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import org.apache.spark.SparkException -import org.apache.spark.rdd.{RDD, UnionRDD, PartitionerAwareUnionRDD} +import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Duration, Time} private[streaming] @@ -45,11 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]]) s" time $validTime") } if (rdds.nonEmpty) { - if(rdds.forall(_.partitioner.isDefined) && rdds.flatMap(_.partitioner).toSet.size == 1) { - Some(new PartitionerAwareUnionRDD(ssc.sc, rdds)) - } else { - Some(new UnionRDD(ssc.sc, rdds)) - } + Some(ssc.sc.union(rdds)) } else { None } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala index ee50a8d024e12..fe0f875525660 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream import scala.reflect.ClassTag -import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} +import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.Duration @@ -63,13 +63,6 @@ class WindowedDStream[T: ClassTag]( override def compute(validTime: Time): Option[RDD[T]] = { val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val rddsInWindow = parent.slice(currentWindow) - val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { - logDebug("Using partition aware union for windowing at " + validTime) - new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) - } else { - logDebug("Using normal union for windowing at " + validTime) - new UnionRDD(ssc.sc, rddsInWindow) - } - Some(windowRDD) + Some(ssc.sc.union(rddsInWindow)) } }