From 238e81b926779ccf788d97c8a78306811dfbf2e9 Mon Sep 17 00:00:00 2001 From: Eric Moyer Date: Wed, 7 Jan 2015 15:49:33 -0500 Subject: [PATCH 1/2] Doc that groupByKey will OOM for large keys Documented that maximum number of values per key for groupByKey is limited by available RAM (see [Datablox][datablox link] and [the spark mailing list][list link]). Just saying that better performance is available is not sufficient. Sometimes you need to do a group-by - your operation needs all the items available in order to complete. This warning explains the problem. [datablox link]: http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html [list link]: http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-RDD-GroupBy-OutOfMemory-Exceptions-tp11427p11466.html --- .../main/scala/org/apache/spark/rdd/PairRDDFunctions.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f8df5b2a08866..f766d151cad2f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -437,6 +437,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. + * + * Note: As currently implemented, GroupByKey must be able to hold all the key-value pairs for any + * key in memory. If a key has too many values, it can result in an out-of-memory exception. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not @@ -458,6 +461,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. + * + * Note: As currently implemented, GroupByKey must be able to hold all the key-value pairs for any + * key in memory. If a key has too many values, it can result in an out-of-memory exception. */ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) From 5b6f4e96b365641ee028f183f4df6d6f9bfcb702 Mon Sep 17 00:00:00 2001 From: Eric Moyer Date: Thu, 8 Jan 2015 11:32:27 -0500 Subject: [PATCH 2/2] groupByKey docs naming updates Fixed capitalization of groupByKey method and used canonical name of OutOfMemoryError --- .../scala/org/apache/spark/rdd/PairRDDFunctions.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index f766d151cad2f..38f8f36a4a4db 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -438,8 +438,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * - * Note: As currently implemented, GroupByKey must be able to hold all the key-value pairs for any - * key in memory. If a key has too many values, it can result in an out-of-memory exception. + * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any + * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not @@ -462,8 +462,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * - * Note: As currently implemented, GroupByKey must be able to hold all the key-value pairs for any - * key in memory. If a key has too many values, it can result in an out-of-memory exception. + * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any + * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions))