Skip to content

Commit

Permalink
Revert "[SPARK-13031] [SQL] cleanup codegen and improve test coverage"
Browse files Browse the repository at this point in the history
This reverts commit cc18a71.
  • Loading branch information
davies committed Jan 29, 2016
1 parent 4637fc0 commit b9dfdcc
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,14 @@ class CodegenContext {

private val curId = new java.util.concurrent.atomic.AtomicInteger()

/**
* A prefix used to generate fresh name.
*/
var freshNamePrefix = ""

/**
* Returns a term name that is unique within this instance of a `CodeGenerator`.
*
* (Since we aren't in a macro context we do not seem to have access to the built in `freshName`
* function.)
*/
def freshName(name: String): String = {
if (freshNamePrefix == "") {
s"$name${curId.getAndIncrement}"
} else {
s"${freshNamePrefix}_$name${curId.getAndIncrement}"
}
def freshName(prefix: String): String = {
s"$prefix${curId.getAndIncrement}"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
// Can't call setNullAt on DecimalType, because we need to keep the offset
s"""
if (this.isNull_$i) {
${ctx.setColumn("mutableRow", e.dataType, i, "null")};
${ctx.setColumn("mutableRow", e.dataType, i, null)};
} else {
${ctx.setColumn("mutableRow", e.dataType, i, s"this.value_$i")};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, LeafExpression}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.Utils

/**
* An interface for those physical operators that support codegen.
Expand All @@ -44,16 +42,10 @@ trait CodegenSupport extends SparkPlan {
private var parent: CodegenSupport = null

/**
* Returns the RDD of InternalRow which generates the input rows.
* Returns an input RDD of InternalRow and Java source code to process them.
*/
def upstream(): RDD[InternalRow]

/**
* Returns Java source code to process the rows from upstream.
*/
def produce(ctx: CodegenContext, parent: CodegenSupport): String = {
def produce(ctx: CodegenContext, parent: CodegenSupport): (RDD[InternalRow], String) = {
this.parent = parent
ctx.freshNamePrefix = nodeName
doProduce(ctx)
}

Expand All @@ -74,41 +66,16 @@ trait CodegenSupport extends SparkPlan {
* # call consume(), wich will call parent.doConsume()
* }
*/
protected def doProduce(ctx: CodegenContext): String
protected def doProduce(ctx: CodegenContext): (RDD[InternalRow], String)

/**
* Consume the columns generated from current SparkPlan, call it's parent.
* Consume the columns generated from current SparkPlan, call it's parent or create an iterator.
*/
def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = {
if (input != null) {
assert(input.length == output.length)
}
parent.consumeChild(ctx, this, input, row)
protected def consume(ctx: CodegenContext, columns: Seq[ExprCode]): String = {
assert(columns.length == output.length)
parent.doConsume(ctx, this, columns)
}

/**
* Consume the columns generated from it's child, call doConsume() or emit the rows.
*/
def consumeChild(
ctx: CodegenContext,
child: SparkPlan,
input: Seq[ExprCode],
row: String = null): String = {
ctx.freshNamePrefix = nodeName
if (row != null) {
ctx.currentVars = null
ctx.INPUT_ROW = row
val evals = child.output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable).gen(ctx)
}
s"""
| ${evals.map(_.code).mkString("\n")}
| ${doConsume(ctx, evals)}
""".stripMargin
} else {
doConsume(ctx, input)
}
}

/**
* Generate the Java source code to process the rows from child SparkPlan.
Expand All @@ -122,9 +89,7 @@ trait CodegenSupport extends SparkPlan {
* # call consume(), which will call parent.doConsume()
* }
*/
protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = {
throw new UnsupportedOperationException
}
def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String
}


Expand All @@ -137,36 +102,31 @@ trait CodegenSupport extends SparkPlan {
case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {

override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def doPrepare(): Unit = {
child.prepare()
}

override def doExecute(): RDD[InternalRow] = {
child.execute()
}
override def supportCodegen: Boolean = true

override def supportCodegen: Boolean = false

override def upstream(): RDD[InternalRow] = {
child.execute()
}

override def doProduce(ctx: CodegenContext): String = {
override def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) = {
val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true))
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
ctx.currentVars = null
val columns = exprs.map(_.gen(ctx))
s"""
| while (input.hasNext()) {
val code = s"""
| while (input.hasNext()) {
| InternalRow $row = (InternalRow) input.next();
| ${columns.map(_.code).mkString("\n")}
| ${consume(ctx, columns)}
| }
""".stripMargin
(child.execute(), code)
}

def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
throw new UnsupportedOperationException
}

override def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException
}

override def simpleString: String = "INPUT"
Expand All @@ -183,20 +143,16 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
*
* -> execute()
* |
* doExecute() ---------> upstream() -------> upstream() ------> execute()
* |
* -----------------> produce()
* doExecute() --------> produce()
* |
* doProduce() -------> produce()
* |
* doProduce()
* doProduce() ---> execute()
* |
* consume()
* consumeChild() <-----------|
* doConsume() ------------|
* |
* doConsume()
* |
* consumeChild() <----- consume()
* doConsume() <----- consume()
*
* SparkPlan A should override doProduce() and doConsume().
*
Expand All @@ -206,48 +162,37 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport {
case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
extends SparkPlan with CodegenSupport {

override def supportCodegen: Boolean = false

override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning
override def outputOrdering: Seq[SortOrder] = plan.outputOrdering

override def doPrepare(): Unit = {
plan.prepare()
}

override def doExecute(): RDD[InternalRow] = {
val ctx = new CodegenContext
val code = plan.produce(ctx, this)
val (rdd, code) = plan.produce(ctx, this)
val references = ctx.references.toArray
val source = s"""
public Object generate(Object[] references) {
return new GeneratedIterator(references);
return new GeneratedIterator(references);
}

class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {

private Object[] references;
${ctx.declareMutableStates()}
${ctx.declareAddedFunctions()}
private Object[] references;
${ctx.declareMutableStates()}

public GeneratedIterator(Object[] references) {
public GeneratedIterator(Object[] references) {
this.references = references;
${ctx.initMutableStates()}
}
}

protected void processNext() throws java.io.IOException {
protected void processNext() {
$code
}
}
}
"""

"""
// try to compile, helpful for debug
// println(s"${CodeFormatter.format(source)}")
CodeGenerator.compile(source)

plan.upstream().mapPartitions { iter =>

rdd.mapPartitions { iter =>
val clazz = CodeGenerator.compile(source)
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
buffer.setInput(iter)
Expand All @@ -258,47 +203,29 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
}
}

override def upstream(): RDD[InternalRow] = {
override def doProduce(ctx: CodegenContext): (RDD[InternalRow], String) = {
throw new UnsupportedOperationException
}

override def doProduce(ctx: CodegenContext): String = {
throw new UnsupportedOperationException
}

override def consumeChild(
ctx: CodegenContext,
child: SparkPlan,
input: Seq[ExprCode],
row: String = null): String = {

if (row != null) {
// There is an UnsafeRow already
override def doConsume(ctx: CodegenContext, child: SparkPlan, input: Seq[ExprCode]): String = {
if (input.nonEmpty) {
val colExprs = output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable)
}
// generate the code to create a UnsafeRow
ctx.currentVars = input
val code = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
s"""
| currentRow = $row;
| ${code.code.trim}
| currentRow = ${code.value};
| return;
""".stripMargin
""".stripMargin
} else {
assert(input != null)
if (input.nonEmpty) {
val colExprs = output.zipWithIndex.map { case (attr, i) =>
BoundReference(i, attr.dataType, attr.nullable)
}
// generate the code to create a UnsafeRow
ctx.currentVars = input
val code = GenerateUnsafeProjection.createCode(ctx, colExprs, false)
s"""
| ${code.code.trim}
| currentRow = ${code.value};
| return;
""".stripMargin
} else {
// There is no columns
s"""
| currentRow = unsafeRow;
| return;
""".stripMargin
}
// There is no columns
s"""
| currentRow = unsafeRow;
| return;
""".stripMargin
}
}

Expand All @@ -319,7 +246,7 @@ case class WholeStageCodegen(plan: CodegenSupport, children: Seq[SparkPlan])
builder.append(simpleString)
builder.append("\n")

plan.generateTreeString(depth + 2, lastChildren :+ false :+ true, builder)
plan.generateTreeString(depth + 1, lastChildren :+children.isEmpty :+ true, builder)
if (children.nonEmpty) {
children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
children.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
Expand Down Expand Up @@ -359,14 +286,13 @@ private[sql] case class CollapseCodegenStages(sqlContext: SQLContext) extends Ru
case plan: CodegenSupport if supportCodegen(plan) &&
// Whole stage codegen is only useful when there are at least two levels of operators that
// support it (save at least one projection/iterator).
(Utils.isTesting || plan.children.exists(supportCodegen)) =>
plan.children.exists(supportCodegen) =>

var inputs = ArrayBuffer[SparkPlan]()
val combined = plan.transform {
case p if !supportCodegen(p) =>
val input = apply(p) // collapse them recursively
inputs += input
InputAdapter(input)
inputs += p
InputAdapter(p)
}.asInstanceOf[CodegenSupport]
WholeStageCodegen(combined, inputs)
}
Expand Down
Loading

0 comments on commit b9dfdcc

Please sign in to comment.