Skip to content

Commit

Permalink
[SPARK-13215] [SQL] remove fallback in codegen
Browse files Browse the repository at this point in the history
Since we remove the configuration for codegen, we are heavily reply on codegen (also TungstenAggregate require the generated MutableProjection to update UnsafeRow), should remove the fallback, which could make user confusing, see the discussion in SPARK-13116.

Author: Davies Liu <davies@databricks.com>

Closes #11097 from davies/remove_fallback.
  • Loading branch information
Davies Liu authored and davies committed Feb 5, 2016
1 parent 0bb5b73 commit 875f507
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,47 +200,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
}

protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
GeneratePredicate.generate(expression, inputSchema)
}

protected def newOrdering(
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
GenerateOrdering.generate(order, inputSchema)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedMutableProjection, MutableRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, ImperativeAggregate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -361,13 +361,7 @@ private[sql] case class ScalaUDAF(
val inputAttributes = childrenSchema.toAttributes
log.debug(
s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
try {
GenerateMutableProjection.generate(children, inputAttributes)()
} catch {
case e: Exception =>
log.error("Failed to generate mutable projection, fallback to interpreted", e)
new InterpretedMutableProjection(children, inputAttributes)
}
GenerateMutableProjection.generate(children, inputAttributes)()
}

private[this] lazy val inputToScalaConverters: Any => Any =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.local

import scala.util.control.NonFatal

import org.apache.spark.Logging
import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
Expand Down Expand Up @@ -96,33 +94,13 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
GenerateMutableProjection.generate(expressions, inputSchema)
}

protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
GeneratePredicate.generate(expression, inputSchema)
}
}

Expand Down

0 comments on commit 875f507

Please sign in to comment.