Skip to content

Commit

Permalink
[SPARK-13293][SQL] generate Expand
Browse files Browse the repository at this point in the history
Expand suffer from create the UnsafeRow from same input multiple times, with codegen, it only need to copy some of the columns.

After this, we can see 3X improvements (from 43 seconds to 13 seconds) on a TPCDS query (Q67) that have eight columns in Rollup.

Ideally, we could mask some of the columns based on bitmask, I'd leave that in the future, because currently Aggregation (50 ns) is much slower than that just copy the variables (1-2 ns).

Author: Davies Liu <davies@databricks.com>

Closes #11177 from davies/gen_expand.
  • Loading branch information
Davies Liu authored and rxin committed Feb 13, 2016
1 parent 62b1c07 commit 2228f07
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 1 deletion.
124 changes: 123 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/execution/Expand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.sql.execution

import scala.collection.immutable.IndexedSeq

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.metric.SQLMetrics

/**
* Apply the all of the GroupExpressions to every input row, hence we will get
Expand All @@ -35,7 +39,10 @@ case class Expand(
projections: Seq[Seq[Expression]],
output: Seq[Attribute],
child: SparkPlan)
extends UnaryNode {
extends UnaryNode with CodegenSupport {

private[sql] override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

// The GroupExpressions can output data with arbitrary partitioning, so set it
// as UNKNOWN partitioning
Expand All @@ -48,6 +55,8 @@ case class Expand(
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
val numOutputRows = longMetric("numOutputRows")

child.execute().mapPartitions { iter =>
val groups = projections.map(projection).toArray
new Iterator[InternalRow] {
Expand All @@ -71,9 +80,122 @@ case class Expand(
idx = 0
}

numOutputRows += 1
result
}
}
}
}

override def upstream(): RDD[InternalRow] = {
child.asInstanceOf[CodegenSupport].upstream()
}

protected override def doProduce(ctx: CodegenContext): String = {
child.asInstanceOf[CodegenSupport].produce(ctx, this)
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
/*
* When the projections list looks like:
* expr1A, exprB, expr1C
* expr2A, exprB, expr2C
* ...
* expr(N-1)A, exprB, expr(N-1)C
*
* i.e. column A and C have different values for each output row, but column B stays constant.
*
* The generated code looks something like (note that B is only computed once in declaration):
*
* // part 1: declare all the columns
* colA = ...
* colB = ...
* colC = ...
*
* // part 2: code that computes the columns
* for (row = 0; row < N; row++) {
* switch (row) {
* case 0:
* colA = ...
* colC = ...
* case 1:
* colA = ...
* colC = ...
* ...
* case N - 1:
* colA = ...
* colC = ...
* }
* // increment metrics and consume output values
* }
*
* We use a for loop here so we only includes one copy of the consume code and avoid code
* size explosion.
*/

// Set input variables
ctx.currentVars = input

// Tracks whether a column has the same output for all rows.
// Size of sameOutput array should equal N.
// If sameOutput(i) is true, then the i-th column has the same value for all output rows given
// an input row.
val sameOutput: Array[Boolean] = output.indices.map { colIndex =>
projections.map(p => p(colIndex)).toSet.size == 1
}.toArray

// Part 1: declare variables for each column
// If a column has the same value for all output rows, then we also generate its computation
// right after declaration. Otherwise its value is computed in the part 2.
val outputColumns = output.indices.map { col =>
val firstExpr = projections.head(col)
if (sameOutput(col)) {
// This column is the same across all output rows. Just generate code for it here.
BindReferences.bindReference(firstExpr, child.output).gen(ctx)
} else {
val isNull = ctx.freshName("isNull")
val value = ctx.freshName("value")
val code = s"""
|boolean $isNull = true;
|${ctx.javaType(firstExpr.dataType)} $value = ${ctx.defaultValue(firstExpr.dataType)};
""".stripMargin
ExprCode(code, isNull, value)
}
}

// Part 2: switch/case statements
val cases = projections.zipWithIndex.map { case (exprs, row) =>
var updateCode = ""
for (col <- exprs.indices) {
if (!sameOutput(col)) {
val ev = BindReferences.bindReference(exprs(col), child.output).gen(ctx)
updateCode +=
s"""
|${ev.code}
|${outputColumns(col).isNull} = ${ev.isNull};
|${outputColumns(col).value} = ${ev.value};
""".stripMargin
}
}

s"""
|case $row:
| ${updateCode.trim}
| break;
""".stripMargin
}

val numOutput = metricTerm(ctx, "numOutputRows")
val i = ctx.freshName("i")
s"""
|${outputColumns.map(_.code).mkString("\n").trim}
|for (int $i = 0; $i < ${projections.length}; $i ++) {
| switch ($i) {
| ${cases.mkString("\n").trim}
| }
| $numOutput.add(1);
| ${consume(ctx, outputColumns)}
|}
""".stripMargin
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,23 @@ class BenchmarkWholeStageCodegen extends SparkFunSuite {

}

ignore("rube") {
val N = 5 << 20

runBenchmark("cube", N) {
sqlContext.range(N).selectExpr("id", "id % 1000 as k1", "id & 256 as k2")
.cube("k1", "k2").sum("id").collect()
}

/**
Intel(R) Core(TM) i7-4558U CPU @ 2.80GHz
cube: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
cube codegen=false 3188 / 3392 1.6 608.2 1.0X
cube codegen=true 1239 / 1394 4.2 236.3 2.6X
*/
}

ignore("hash and BytesToBytesMap") {
val N = 50 << 20

Expand Down

0 comments on commit 2228f07

Please sign in to comment.