From a6fed07f65c3c71e19f231c54fa43fc4904b040d Mon Sep 17 00:00:00 2001 From: "Perinkulam I. Ganesh" Date: Wed, 1 Jul 2015 15:50:28 -0500 Subject: [PATCH 1/3] [SPARK-8695] [core] [WIP] TreeAggregation shouldn't be triggered for 5 partitions --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 10610f4b6f1ff..586f705f80874 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1078,7 +1078,9 @@ abstract class RDD[T: ClassTag]( val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2) // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. - while (numPartitions > scale + numPartitions / scale) { + + // Don't trigger treeAggregation for 5 partitions + while (numPartitions > 5 && (numPartitions > scale + numPartitions / scale)) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { From 9ad067c4bdc038e9c3e106ce84355f002a82aad7 Mon Sep 17 00:00:00 2001 From: "Perinkulam I. Ganesh" Date: Tue, 14 Jul 2015 09:57:34 -0500 Subject: [PATCH 2/3] [SPARK-8695] [core] [WIP] TreeAggregation shouldn't be triggered for 5 partitions --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 586f705f80874..ee91e7f17f7eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1080,7 +1080,7 @@ abstract class RDD[T: ClassTag]( // the wall-clock time, we stop tree aggregation. // Don't trigger treeAggregation for 5 partitions - while (numPartitions > 5 && (numPartitions > scale + numPartitions / scale)) { + while (numPartitions > scale + math.ceil(1.0 * numPartitions / scale)) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { From 041620c93dc72010bb0907c0c5363808878d2496 Mon Sep 17 00:00:00 2001 From: "Perinkulam I. Ganesh" Date: Mon, 20 Jul 2015 15:26:37 -0500 Subject: [PATCH 3/3] [SPARK-8695][CORE][MLlib] TreeAggregation shouldn't be triggered when it doesn't save wall-clock time. --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index ee91e7f17f7eb..d2da19aa48bf9 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1079,8 +1079,8 @@ abstract class RDD[T: ClassTag]( // If creating an extra level doesn't help reduce // the wall-clock time, we stop tree aggregation. - // Don't trigger treeAggregation for 5 partitions - while (numPartitions > scale + math.ceil(1.0 * numPartitions / scale)) { + // Don't trigger TreeAggregation when it doesn't save wall-clock time + while (numPartitions > scale + math.ceil((numPartitions).toDouble / scale)) { numPartitions /= scale val curNumPartitions = numPartitions partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex {