From e6c43ecdf4e49ea73befea8b87fb2a47eec2fb37 Mon Sep 17 00:00:00 2001 From: wangfei Date: Sun, 14 Dec 2014 17:25:17 +0800 Subject: [PATCH 1/4] adding spark.default.parallelismRatio --- .../src/main/scala/org/apache/spark/Partitioner.scala | 9 ++++++--- docs/configuration.md | 11 +++++++++++ 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index e53a78ead2c0e..1789d5deb1909 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -49,8 +49,8 @@ object Partitioner { * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the - * same as the number of partitions in the largest upstream RDD, as this should - * be least likely to cause out-of-memory errors. + * same as { number of partitions in the largest upstream RDD * spark.default.parallelismRatio }, + * as this should be least likely to cause out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ @@ -62,7 +62,10 @@ object Partitioner { if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { - new HashPartitioner(bySize.head.partitions.size) + val numPartitons: Int = + rdd.context.conf + .getDouble("spark.default.parallelismRatio", 1) * bySize.head.partitions.size + new HashPartitioner(Math.max(1, numPartitons)) } } } diff --git a/docs/configuration.md b/docs/configuration.md index acee267883ed5..19aa8510f93b0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -617,6 +617,17 @@ Apart from these, the following properties are also available, and may be useful reduceByKey, and parallelize when not set by user. + + spark.default.parallelismRatio + 1.0 + + Default ratio to control the number of partitions in RDDs returned by transformations like + join, reduceByKey, and parallelize when not set by user, + which has lower priority than spark.default.parallelism. If it is set as 0.5, + it means the number of partitions in these RDDs will be + Max(1, 0.5 * number of partitions in the largest upstream RDD). + + spark.broadcast.factory org.apache.spark.broadcast.
TorrentBroadcastFactory From 63826ae63bb1f912a6000f0cd958c44579960c1e Mon Sep 17 00:00:00 2001 From: wangfei Date: Sun, 14 Dec 2014 17:31:58 +0800 Subject: [PATCH 2/4] minor fix --- core/src/main/scala/org/apache/spark/Partitioner.scala | 2 +- docs/configuration.md | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 1789d5deb1909..9a3a24a61ddb8 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -49,7 +49,7 @@ object Partitioner { * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the - * same as { number of partitions in the largest upstream RDD * spark.default.parallelismRatio }, + * same as {number of partitions in the largest upstream RDD * spark.default.parallelismRatio}, * as this should be least likely to cause out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. diff --git a/docs/configuration.md b/docs/configuration.md index 19aa8510f93b0..78f6720a66904 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -623,8 +623,8 @@ Apart from these, the following properties are also available, and may be useful Default ratio to control the number of partitions in RDDs returned by transformations like join, reduceByKey, and parallelize when not set by user, - which has lower priority than spark.default.parallelism. If it is set as 0.5, - it means the number of partitions in these RDDs will be + which has lower priority than spark.default.parallelism. For examples, if it is + configured as 0.5, it means the number of partitions in these RDDs will be Max(1, 0.5 * number of partitions in the largest upstream RDD). From a71ce3b92a3f49f8035fa14b4249775087203af5 Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 15 Dec 2014 09:01:13 +0800 Subject: [PATCH 3/4] minor fix --- core/src/main/scala/org/apache/spark/Partitioner.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 9a3a24a61ddb8..682b2b5aa2b0d 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -62,10 +62,10 @@ object Partitioner { if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { - val numPartitons: Int = + val numPartitons = rdd.context.conf .getDouble("spark.default.parallelismRatio", 1) * bySize.head.partitions.size - new HashPartitioner(Math.max(1, numPartitons)) + new HashPartitioner(Math.max(1, numPartitons.toInt)) } } } From f21bfd4904fa340099d190bd3963fefc79f0faa4 Mon Sep 17 00:00:00 2001 From: wangfei Date: Mon, 15 Dec 2014 09:11:15 +0800 Subject: [PATCH 4/4] minor fix --- core/src/main/scala/org/apache/spark/Partitioner.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 682b2b5aa2b0d..6374840b1210b 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -62,10 +62,10 @@ object Partitioner { if (rdd.context.conf.contains("spark.default.parallelism")) { new HashPartitioner(rdd.context.defaultParallelism) } else { - val numPartitons = - rdd.context.conf - .getDouble("spark.default.parallelismRatio", 1) * bySize.head.partitions.size - new HashPartitioner(Math.max(1, numPartitons.toInt)) + val ratio = rdd.context.conf.getDouble("spark.default.parallelismRatio", 1) + require(ratio > 0, "spark.default.parallelismRatio must be greater than 0!") + val numPartitons = (ratio * bySize.head.partitions.size).toInt + new HashPartitioner(Math.max(1, numPartitons)) } } }