From d29ff90bbc0c0da766afec7a568109b3fb9f5503 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 29 May 2015 15:14:25 +0900 Subject: [PATCH 1/3] [SPARK-7936] [SQL] Add configuration for initial size of hash for aggregation and limit --- .../scala/org/apache/spark/sql/SQLConf.scala | 14 +++++ .../spark/sql/execution/Aggregate.scala | 57 +++++++++++++------ .../spark/sql/execution/SparkStrategies.scala | 8 ++- 3 files changed, 59 insertions(+), 20 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 9de75f4c4d084..3ce985192fcb5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -420,6 +420,16 @@ private[spark] object SQLConf { val USE_SQL_AGGREGATE2 = booleanConf("spark.sql.useAggregate2", defaultValue = Some(true), doc = "") + val AGGREGATION_HASH_INIT_SIZE = intConf( + "spark.sql.aggregation.hash.initSize", + defaultValue = Some(16384), + doc = "initialize size of hash for (partial or full) aggregation") + + val PARTIAL_AGGREGATION_MAX_ENTRY = intConf( + "spark.sql.partial.aggregation.maxEntry", + defaultValue = Some(-1), + doc = "maximum size of hash for partial aggregation. -1 for unlimited, which is default") + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -527,6 +537,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf { private[spark] def dataFrameRetainGroupColumns: Boolean = getConf(DATAFRAME_RETAIN_GROUP_COLUMNS) + private[spark] def aggregationHashInitSize: Int = getConf(AGGREGATION_HASH_INIT_SIZE) + + private[spark] def partialAggregationMaxEntry: Int = getConf(PARTIAL_AGGREGATION_MAX_ENTRY) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index f3b6a3a5f4a33..a09cb181630f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -17,7 +17,8 @@ package org.apache.spark.sql.execution -import java.util.HashMap +import java.util +import java.util.Map.Entry import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD @@ -36,6 +37,8 @@ import org.apache.spark.sql.execution.metric.SQLMetrics * ensure all values where `groupingExpressions` are equal are present. * @param groupingExpressions expressions that are evaluated to determine grouping. * @param aggregateExpressions expressions that are computed for each group. + * @param initSize initialize size of hash for (partial or full) aggregation + * @param maxEntry maximum size of hash for partial aggregation. -1 for unlimited * @param child the input data source. */ @DeveloperApi @@ -43,6 +46,8 @@ case class Aggregate( partial: Boolean, groupingExpressions: Seq[Expression], aggregateExpressions: Seq[NamedExpression], + initSize: Int = -1, + maxEntry: Int = -1, child: SparkPlan) extends UnaryNode { @@ -155,29 +160,45 @@ case class Aggregate( } } else { child.execute().mapPartitions { iter => - val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]] - val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) + val hashTable = if (initSize < 0) + new java.util.HashMap[InternalRow, Array[AggregateFunction1]]() else + new java.util.HashMap[InternalRow, Array[AggregateFunction1]](initSize) - var currentRow: InternalRow = null - while (iter.hasNext) { - currentRow = iter.next() - numInputRows += 1 - val currentGroup = groupingProjection(currentRow) - var currentBuffer = hashTable.get(currentGroup) - if (currentBuffer == null) { - currentBuffer = newAggregateBuffer() - hashTable.put(currentGroup.copy(), currentBuffer) - } + val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) - var i = 0 - while (i < currentBuffer.length) { - currentBuffer(i).update(currentRow) - i += 1 + val staged = new Iterator[java.util.Map.Entry[InternalRow, Array[AggregateFunction1]]]() { + + var current: util.Iterator[Entry[InternalRow, Array[AggregateFunction1]]] = null + + override def hasNext: Boolean = iter.hasNext || (current != null && current.hasNext) + + override def next(): java.util.Map.Entry[InternalRow, Array[AggregateFunction1]] = { + if (current == null || !current.hasNext) { + hashTable.clear() + while (iter.hasNext && (maxEntry < 0 || hashTable.size() < maxEntry)) { + var currentRow = iter.next() + numInputRows += 1 + val currentGroup = groupingProjection(currentRow) + var currentBuffer = hashTable.get(currentGroup) + if (currentBuffer == null) { + currentBuffer = newAggregateBuffer() + hashTable.put(currentGroup.copy(), currentBuffer) + } + + var i = 0 + while (i < currentBuffer.length) { + currentBuffer(i).update(currentRow) + i += 1 + } + } + current = hashTable.entrySet().iterator() + } + current.next() } } new Iterator[InternalRow] { - private[this] val hashTableIter = hashTable.entrySet().iterator() + private[this] val hashTableIter = staged private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length) private[this] val resultProjection = new InterpretedMutableProjection( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 5e40d77689045..3718a025085c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} -import org.apache.spark.sql.types._ import org.apache.spark.sql.{SQLContext, Strategy, execution} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { @@ -160,10 +159,14 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { partial = false, namedGroupingAttributes, rewrittenAggregateExpressions, + sqlContext.conf.aggregationHashInitSize, + -1, execution.Aggregate( partial = true, groupingExpressions, partialComputation, + sqlContext.conf.aggregationHashInitSize, + sqlContext.conf.partialAggregationMaxEntry, planLater(child))) :: Nil case _ => Nil @@ -355,7 +358,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Nil } else { Utils.checkInvalidAggregateFunction2(a) - execution.Aggregate(partial = false, group, agg, planLater(child)) :: Nil + execution.Aggregate(partial = false, group, agg, + sqlContext.conf.aggregationHashInitSize, -1, planLater(child)) :: Nil } } case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, child) => From c7ec8fb6fabb5b8ca2c5043ec0f59b77d18a75db Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 21 Aug 2015 16:39:25 +0900 Subject: [PATCH 2/3] Fixed scalastyle failure --- .../scala/org/apache/spark/sql/execution/Aggregate.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala index a09cb181630f2..67556bd08368c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Aggregate.scala @@ -160,9 +160,11 @@ case class Aggregate( } } else { child.execute().mapPartitions { iter => - val hashTable = if (initSize < 0) - new java.util.HashMap[InternalRow, Array[AggregateFunction1]]() else + val hashTable = if (initSize < 0) { + new java.util.HashMap[InternalRow, Array[AggregateFunction1]]() + } else { new java.util.HashMap[InternalRow, Array[AggregateFunction1]](initSize) + } val groupingProjection = new InterpretedMutableProjection(groupingExpressions, child.output) From 39a9c4184952c90673a1a9766a72bfc120c23123 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 18 Sep 2015 11:21:48 +0900 Subject: [PATCH 3/3] changed prefix as spark.sql.partialAggregation --- sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 3ce985192fcb5..5d4d994d02045 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -426,7 +426,7 @@ private[spark] object SQLConf { doc = "initialize size of hash for (partial or full) aggregation") val PARTIAL_AGGREGATION_MAX_ENTRY = intConf( - "spark.sql.partial.aggregation.maxEntry", + "spark.sql.partialAggregation.maxEntry", defaultValue = Some(-1), doc = "maximum size of hash for partial aggregation. -1 for unlimited, which is default")