From ce5bed05413b9ef594ca07598d8a292f994d928e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 15 Jan 2016 22:19:25 -0800 Subject: [PATCH 1/5] generate aggregation --- .../sql/execution/WholeStageCodegen.scala | 12 +- .../aggregate/TungstenAggregate.scala | 104 +++++++++++++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 5 + .../BenchmarkWholeStageCodegen.scala | 10 +- 4 files changed, 119 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index c15fabab805a7..57f4945de9804 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -264,12 +264,16 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan]) */ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Rule[SparkPlan] { + private def supportCodegen(e: Expression): Boolean = e match { + case e: LeafExpression => true + // CodegenFallback requires the input to be an InternalRow + case e: CodegenFallback => false + case _ => true + } + private def supportCodegen(plan: SparkPlan): Boolean = plan match { case plan: CodegenSupport if plan.supportCodegen => - // Non-leaf with CodegenFallback does not work with whole stage codegen - val willFallback = plan.expressions.exists( - _.find(e => e.isInstanceOf[CodegenFallback] && !e.isInstanceOf[LeafExpression]).isDefined - ) + val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns val haveManyColumns = plan.output.length > 200 !willFallback && !haveManyColumns diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index a9cf04388d2e8..0580fe07f06a6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -21,9 +21,10 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, UnsafeFixedWidthAggregationMap} +import org.apache.spark.sql.execution.{CodegenSupport, SparkPlan, UnaryNode, UnsafeFixedWidthAggregationMap} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.StructType @@ -35,7 +36,7 @@ case class TungstenAggregate( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryNode { + extends UnaryNode with CodegenSupport { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -112,6 +113,103 @@ case class TungstenAggregate( } } + override def supportCodegen: Boolean = { + groupingExpressions.isEmpty && + // ImperativeAggregate is not supported right now + !aggregateExpressions.exists(_.aggregateFunction.isInstanceOf[ImperativeAggregate]) && + // final aggregation only have one row, do not need to codegen + !aggregateExpressions.exists(e => e.mode == Final || e.mode == Complete) + } + + // The variables used as aggregation buffer + private var bufVars: Seq[ExprCode] = _ + + private val modes = aggregateExpressions.map(_.mode).distinct + + protected override def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) = { + val initAgg = ctx.freshName("initAgg") + ctx.addMutableState("boolean", initAgg, s"$initAgg = false;") + + // generate variables for aggregation buffer + val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) + val initExpr = functions.flatMap(f => f.initialValues) + bufVars = initExpr.map { e => + var isNull = ctx.freshName("bufIsNull") + val value = ctx.freshName("bufValue") + // The initial expression should not access any column + val ev = e.gen(ctx) + val initVars = if (e.nullable) { + s""" + | boolean $isNull = ${ev.isNull}; + | ${ctx.javaType(e.dataType)} $value = ${ctx.defaultValue(e.dataType)}; + | if (!${ev.isNull}) { + | $value = ${ev.value}; + | } + """.stripMargin + } else { + isNull = "false" + s"${ctx.javaType(e.dataType)} $value = ${ev.value};" + } + ExprCode(ev.code + initVars, isNull, value) + } + + val (rdd, childSource) = child.asInstanceOf[CodegenSupport].produce(ctx, this) + val source = + s""" + | if (!$initAgg) { + | $initAgg = true; + | + | // initialize aggregation buffer + | ${bufVars.map(_.code).mkString("\n")} + | + | $childSource + | + | // output the result + | ${consume(ctx, bufVars)} + | } + """.stripMargin + + (rdd, source) + } + + override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = { + // only have DeclarativeAggregate + val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) + // the model could be only Partial or PartialMerge + val updateExpr = if (modes.contains(Partial)) { + functions.flatMap(_.updateExpressions) + } else { + functions.flatMap(_.mergeExpressions) + } + + val inputAttr = functions.flatMap(_.aggBufferAttributes) ++ child.output + val boundExpr = updateExpr.map(e => BindReferences.bindReference(e, inputAttr)) + ctx.currentVars = bufVars ++ input + // TODO: support subexpression elimination + val codes = boundExpr.zipWithIndex.map { case (e, i) => + val ev = e.gen(ctx) + if (e.nullable) { + s""" + | ${ev.code} + | ${bufVars(i).isNull} = ${ev.isNull}; + | if (!${ev.isNull}) { + | ${bufVars(i).value} = ${ev.value}; + | } + """.stripMargin + } else { + s""" + | ${ev.code} + | ${bufVars(i).value} = ${ev.value}; + """.stripMargin + } + } + + s""" + | // do aggregate and update aggregation buffer + | ${codes.mkString("")} + """.stripMargin + } + override def simpleString: String = { val allAggregateExpressions = aggregateExpressions diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index bd11a387a1d5d..366081de75ce7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -36,6 +36,11 @@ import org.apache.spark.sql.types._ class DataFrameSuite extends QueryTest with SharedSQLContext { import testImplicits._ + test("wholecodegen") { + sqlContext.range(1<<4).filter("(id & 1) = 1").groupBy().count().explain(true) + assert(sqlContext.range(1<<4).filter("(id & 1) = 1").count() === (1<<3)) + } + test("analysis error should be eagerly reported") { // Eager analysis. withSQLConf(SQLConf.DATAFRAME_EAGER_ANALYSIS.key -> "true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 788b04fcf8c2e..7291039a3aa9e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -46,15 +46,15 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { /* Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz - Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate - ------------------------------------------------------------------------- - Without whole stage codegen 6725.52 31.18 1.00 X - With whole stage codegen 2233.05 93.91 3.01 X + Single Int Column Scan: Avg Time(ms) Avg Rate(M/s) Relative Rate + ------------------------------------------------------------------------------- + Without whole stage codegen 7775.53 26.97 1.00 X + With whole stage codegen 342.15 612.94 22.73 X */ benchmark.run() } - ignore("benchmark") { + test("benchmark") { testWholeStage(1024 * 1024 * 200) } } From 006f37a34ac7332b65f0f238abe25a2bf1f7725a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 19 Jan 2016 15:04:32 -0800 Subject: [PATCH 2/5] add test --- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 5 ----- .../sql/execution/BenchmarkWholeStageCodegen.scala | 2 +- .../spark/sql/execution/WholeStageCodegenSuite.scala | 11 +++++++++++ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 366081de75ce7..bd11a387a1d5d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -36,11 +36,6 @@ import org.apache.spark.sql.types._ class DataFrameSuite extends QueryTest with SharedSQLContext { import testImplicits._ - test("wholecodegen") { - sqlContext.range(1<<4).filter("(id & 1) = 1").groupBy().count().explain(true) - assert(sqlContext.range(1<<4).filter("(id & 1) = 1").count() === (1<<3)) - } - test("analysis error should be eagerly reported") { // Eager analysis. withSQLConf(SQLConf.DATAFRAME_EAGER_ANALYSIS.key -> "true") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala index 7291039a3aa9e..c4aad398bfa54 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BenchmarkWholeStageCodegen.scala @@ -54,7 +54,7 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite { benchmark.run() } - test("benchmark") { + ignore("benchmark") { testWholeStage(1024 * 1024 * 200) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index c54fc6ba2de3d..19c833e348c79 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.test.SharedSQLContext class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { @@ -35,4 +37,13 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { sortAnswers = false ) } + + test("Aggregate should be included in WholeStageCodegen") { + val df = sqlContext.range(10).filter("id > 1").groupBy().count() + val plan = df.queryExecution.executedPlan + assert(plan.find(p => + p.isInstanceOf[WholeStageCodegen] && + p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[TungstenAggregate]).isDefined) + assert(df.collect() === Array(Row(8))) + } } From 60ebebcd172888dff5d4b1b94401e36ca79bf332 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 20 Jan 2016 10:53:36 -0800 Subject: [PATCH 3/5] fix tests --- .../aggregate/TungstenAggregate.scala | 35 +++++-------------- .../execution/metric/SQLMetricsSuite.scala | 4 ++- 2 files changed, 12 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 0580fe07f06a6..1f3160dd95b0b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -134,22 +134,14 @@ case class TungstenAggregate( val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) val initExpr = functions.flatMap(f => f.initialValues) bufVars = initExpr.map { e => - var isNull = ctx.freshName("bufIsNull") + val isNull = ctx.freshName("bufIsNull") val value = ctx.freshName("bufValue") // The initial expression should not access any column val ev = e.gen(ctx) - val initVars = if (e.nullable) { - s""" - | boolean $isNull = ${ev.isNull}; - | ${ctx.javaType(e.dataType)} $value = ${ctx.defaultValue(e.dataType)}; - | if (!${ev.isNull}) { - | $value = ${ev.value}; - | } - """.stripMargin - } else { - isNull = "false" - s"${ctx.javaType(e.dataType)} $value = ${ev.value};" - } + val initVars = s""" + | boolean $isNull = ${ev.isNull}; + | ${ctx.javaType(e.dataType)} $value = ${ev.value}; + """.stripMargin ExprCode(ev.code + initVars, isNull, value) } @@ -188,20 +180,11 @@ case class TungstenAggregate( // TODO: support subexpression elimination val codes = boundExpr.zipWithIndex.map { case (e, i) => val ev = e.gen(ctx) - if (e.nullable) { - s""" - | ${ev.code} - | ${bufVars(i).isNull} = ${ev.isNull}; - | if (!${ev.isNull}) { - | ${bufVars(i).value} = ${ev.value}; - | } + s""" + | ${ev.code} + | ${bufVars(i).isNull} = ${ev.isNull}; + | ${bufVars(i).value} = ${ev.value}; """.stripMargin - } else { - s""" - | ${ev.code} - | ${bufVars(i).value} = ${ev.value}; - """.stripMargin - } } s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index 4339f7260dcb9..51285431a47ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -71,7 +71,9 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext { expectedNumOfJobs: Int, expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = { val previousExecutionIds = sqlContext.listener.executionIdToData.keySet - df.collect() + withSQLConf("spark.sql.codegen.wholeStage" -> "false") { + df.collect() + } sparkContext.listenerBus.waitUntilEmpty(10000) val executionIds = sqlContext.listener.executionIdToData.keySet.diff(previousExecutionIds) assert(executionIds.size === 1) From 7ccd901c2e29261a9b46f273be3520725ec01521 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 20 Jan 2016 13:01:56 -0800 Subject: [PATCH 4/5] address comments --- .../spark/sql/execution/aggregate/TungstenAggregate.scala | 2 +- .../apache/spark/sql/execution/WholeStageCodegenSuite.scala | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 5ac96e74e1b82..23e54f344d252 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -168,7 +168,7 @@ case class TungstenAggregate( override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = { // only have DeclarativeAggregate val functions = aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate]) - // the model could be only Partial or PartialMerge + // the mode could be only Partial or PartialMerge val updateExpr = if (modes.contains(Partial)) { functions.flatMap(_.updateExpressions) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 19c833e348c79..846be9dc2c2a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.functions.{avg, col, max} class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { @@ -39,11 +40,11 @@ class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext { } test("Aggregate should be included in WholeStageCodegen") { - val df = sqlContext.range(10).filter("id > 1").groupBy().count() + val df = sqlContext.range(10).groupBy().agg(max(col("id")), avg(col("id"))) val plan = df.queryExecution.executedPlan assert(plan.find(p => p.isInstanceOf[WholeStageCodegen] && p.asInstanceOf[WholeStageCodegen].plan.isInstanceOf[TungstenAggregate]).isDefined) - assert(df.collect() === Array(Row(8))) + assert(df.collect() === Array(Row(9, 4.5))) } } From 1beb7f19679b2b853835bbe86e2633d118c0adba Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 20 Jan 2016 13:25:14 -0800 Subject: [PATCH 5/5] Update WholeStageCodegenSuite.scala --- .../org/apache/spark/sql/execution/WholeStageCodegenSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala index 846be9dc2c2a6..300788c88ab2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.execution.aggregate.TungstenAggregate -import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.functions.{avg, col, max} +import org.apache.spark.sql.test.SharedSQLContext class WholeStageCodegenSuite extends SparkPlanTest with SharedSQLContext {