Skip to content

Commit

Permalink
remove fallback in codegen
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Feb 5, 2016
1 parent 138c300 commit 4e9ce00
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,47 +200,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
}

protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
GeneratePredicate.generate(expression, inputSchema)
}

protected def newOrdering(
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
GenerateOrdering.generate(order, inputSchema)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedMutableProjection, MutableRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, ImperativeAggregate}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow}
import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -361,13 +361,7 @@ private[sql] case class ScalaUDAF(
val inputAttributes = childrenSchema.toAttributes
log.debug(
s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
try {
GenerateMutableProjection.generate(children, inputAttributes)()
} catch {
case e: Exception =>
log.error("Failed to generate mutable projection, fallback to interpreted", e)
new InterpretedMutableProjection(children, inputAttributes)
}
GenerateMutableProjection.generate(children, inputAttributes)()
}

private[this] lazy val inputToScalaConverters: Any => Any =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.local

import scala.util.control.NonFatal

import org.apache.spark.Logging
import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
Expand Down Expand Up @@ -96,33 +94,13 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
GenerateMutableProjection.generate(expressions, inputSchema)
}

protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case NonFatal(e) =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
GeneratePredicate.generate(expression, inputSchema)
}
}

Expand Down

0 comments on commit 4e9ce00

Please sign in to comment.