Skip to content

Commit

Permalink
[SPARK-11644][SQL] Remove the option to turn off unsafe and codegen.
Browse files Browse the repository at this point in the history
Author: Reynold Xin <rxin@databricks.com>

Closes #9618 from rxin/SPARK-11644.
  • Loading branch information
rxin committed Nov 11, 2015
1 parent 27029bc commit df97df2
Show file tree
Hide file tree
Showing 27 changed files with 257 additions and 494 deletions.
27 changes: 5 additions & 22 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -252,24 +252,8 @@ private[spark] object SQLConf {
"not be provided to ExchangeCoordinator.",
isPublic = false)

val TUNGSTEN_ENABLED = booleanConf("spark.sql.tungsten.enabled",
defaultValue = Some(true),
doc = "When true, use the optimized Tungsten physical execution backend which explicitly " +
"manages memory and dynamically generates bytecode for expression evaluation.")

val CODEGEN_ENABLED = booleanConf("spark.sql.codegen",
defaultValue = Some(true), // use TUNGSTEN_ENABLED as default
doc = "When true, code will be dynamically generated at runtime for expression evaluation in" +
" a specific query.",
isPublic = false)

val UNSAFE_ENABLED = booleanConf("spark.sql.unsafe.enabled",
defaultValue = Some(true), // use TUNGSTEN_ENABLED as default
doc = "When true, use the new optimized Tungsten physical execution backend.",
isPublic = false)

val SUBEXPRESSION_ELIMINATION_ENABLED = booleanConf("spark.sql.subexpressionElimination.enabled",
defaultValue = Some(true), // use CODEGEN_ENABLED as default
defaultValue = Some(true),
doc = "When true, common subexpressions will be eliminated.",
isPublic = false)

Expand Down Expand Up @@ -475,6 +459,9 @@ private[spark] object SQLConf {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
val EXTERNAL_SORT = "spark.sql.planner.externalSort"
val USE_SQL_AGGREGATE2 = "spark.sql.useAggregate2"
val TUNGSTEN_ENABLED = "spark.sql.tungsten.enabled"
val CODEGEN_ENABLED = "spark.sql.codegen"
val UNSAFE_ENABLED = "spark.sql.unsafe.enabled"
}
}

Expand Down Expand Up @@ -541,14 +528,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf {

private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)

private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, getConf(TUNGSTEN_ENABLED))

def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)

private[spark] def unsafeEnabled: Boolean = getConf(UNSAFE_ENABLED, getConf(TUNGSTEN_ENABLED))

private[spark] def subexpressionEliminationEnabled: Boolean =
getConf(SUBEXPRESSION_ELIMINATION_ENABLED, codegenEnabled)
getConf(SUBEXPRESSION_ELIMINATION_ENABLED)

private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ case class Exchange(
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = {
unsafeEnabled && codegenEnabled && GenerateUnsafeProjection.canSupport(child.schema) &&
GenerateUnsafeProjection.canSupport(child.schema) &&
!newPartitioning.isInstanceOf[RangePartitioning]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
|${stringOrError(optimizedPlan)}
|== Physical Plan ==
|${stringOrError(executedPlan)}
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
""".stripMargin.trim
}
}
120 changes: 45 additions & 75 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
protected def sparkContext = sqlContext.sparkContext

// sqlContext will be null when we are being deserialized on the slaves. In this instance
// the value of codegenEnabled/unsafeEnabled will be set by the desserializer after the
// the value of subexpressionEliminationEnabled will be set by the desserializer after the
// constructor has run.
val codegenEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.codegenEnabled
} else {
false
}
val unsafeEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.unsafeEnabled
} else {
false
}
val subexpressionEliminationEnabled: Boolean = if (sqlContext != null) {
sqlContext.conf.subexpressionEliminationEnabled
} else {
Expand Down Expand Up @@ -233,83 +223,63 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ

protected def newProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate projection, fallback to interpret", e)
new InterpretedProjection(expressions, inputSchema)
}
}
} else {
new InterpretedProjection(expressions, inputSchema)
log.debug(s"Creating Projection: $expressions, inputSchema: $inputSchema")
try {
GenerateProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate projection, fallback to interpret", e)
new InterpretedProjection(expressions, inputSchema)
}
}
}

protected def newMutableProjection(
expressions: Seq[Expression],
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if(codegenEnabled) {
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
expressions: Seq[Expression], inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
try {
GenerateMutableProjection.generate(expressions, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate mutable projection, fallback to interpreted", e)
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
}

protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
if (codegenEnabled) {
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
} else {
InterpretedPredicate.create(expression, inputSchema)
try {
GeneratePredicate.generate(expression, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate predicate, fallback to interpreted", e)
InterpretedPredicate.create(expression, inputSchema)
}
}
}

protected def newOrdering(
order: Seq[SortOrder],
inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
if (codegenEnabled) {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
} else {
new InterpretedOrdering(order, inputSchema)
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
try {
GenerateOrdering.generate(order, inputSchema)
} catch {
case e: Exception =>
if (isTesting) {
throw e
} else {
log.error("Failed to generate ordering, fallback to interpreted", e)
new InterpretedOrdering(order, inputSchema)
}
}
}
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ import org.apache.spark.sql.execution.datasources.DataSourceStrategy
class SparkPlanner(val sqlContext: SQLContext) extends SparkStrategies {
val sparkContext: SparkContext = sqlContext.sparkContext

def codegenEnabled: Boolean = sqlContext.conf.codegenEnabled

def unsafeEnabled: Boolean = sqlContext.conf.unsafeEnabled

def numPartitions: Int = sqlContext.conf.numShufflePartitions

def strategies: Seq[Strategy] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
* if necessary.
*/
def getSortOperator(sortExprs: Seq[SortOrder], global: Boolean, child: SparkPlan): SparkPlan = {
if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
TungstenSort.supportsSchema(child.schema)) {
if (TungstenSort.supportsSchema(child.schema)) {
execution.TungstenSort(sortExprs, global, child)
} else {
execution.Sort(sortExprs, global, child)
Expand Down Expand Up @@ -368,8 +367,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Project(projectList, child) =>
// If unsafe mode is enabled and we support these data types in Unsafe, use the
// Tungsten project. Otherwise, use the normal project.
if (sqlContext.conf.unsafeEnabled &&
UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
if (UnsafeProjection.canSupport(projectList) && UnsafeProjection.canSupport(child.schema)) {
execution.TungstenProject(projectList, planLater(child)) :: Nil
} else {
execution.Project(projectList, planLater(child)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,10 @@ object Utils {
resultExpressions: Seq[NamedExpression],
child: SparkPlan): Seq[SparkPlan] = {
// Check if we can use TungstenAggregate.
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
TungstenAggregate.supportsAggregate(
val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))


// 1. Create an Aggregate Operator for partial aggregations.

val groupingAttributes = groupingExpressions.map(_.toAttribute)
Expand Down Expand Up @@ -144,11 +141,9 @@ object Utils {
child: SparkPlan): Seq[SparkPlan] = {

val aggregateExpressions = functionsWithDistinct ++ functionsWithoutDistinct
val usesTungstenAggregate =
child.sqlContext.conf.unsafeEnabled &&
TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))
val usesTungstenAggregate = TungstenAggregate.supportsAggregate(
groupingExpressions,
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes))

// functionsWithDistinct is guaranteed to be non-empty. Even though it may contain more than one
// DISTINCT aggregate function, all of those functions will have the same column expression.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,33 @@ case class SetCommand(kv: Option[(String, Option[String])]) extends RunnableComm
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.TUNGSTEN_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.TUNGSTEN_ENABLED} is deprecated and " +
s"will be ignored. Tungsten will continue to be used.")
Seq(Row(SQLConf.Deprecated.TUNGSTEN_ENABLED, "true"))
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.CODEGEN_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.CODEGEN_ENABLED} is deprecated and " +
s"will be ignored. Codegen will continue to be used.")
Seq(Row(SQLConf.Deprecated.CODEGEN_ENABLED, "true"))
}
(keyValueOutput, runFunc)

case Some((SQLConf.Deprecated.UNSAFE_ENABLED, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
logWarning(
s"Property ${SQLConf.Deprecated.UNSAFE_ENABLED} is deprecated and " +
s"will be ignored. Unsafe mode will continue to be used.")
Seq(Row(SQLConf.Deprecated.UNSAFE_ENABLED, "true"))
}
(keyValueOutput, runFunc)

// Configures a single property.
case Some((key, Some(value))) =>
val runFunc = (sqlContext: SQLContext) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@ trait HashJoin {
override def output: Seq[Attribute] = left.output ++ right.output

protected[this] def isUnsafeMode: Boolean = {
(self.codegenEnabled && self.unsafeEnabled
&& UnsafeProjection.canSupport(buildKeys)
&& UnsafeProjection.canSupport(self.schema))
UnsafeProjection.canSupport(buildKeys) && UnsafeProjection.canSupport(self.schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ trait HashOuterJoin {
}

protected[this] def isUnsafeMode: Boolean = {
(self.codegenEnabled && self.unsafeEnabled && joinType != FullOuter
&& UnsafeProjection.canSupport(buildKeys)
&& UnsafeProjection.canSupport(self.schema))
joinType != FullOuter &&
UnsafeProjection.canSupport(buildKeys) &&
UnsafeProjection.canSupport(self.schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,10 @@ trait HashSemiJoin {
override def output: Seq[Attribute] = left.output

protected[this] def supportUnsafe: Boolean = {
(self.codegenEnabled && self.unsafeEnabled
&& UnsafeProjection.canSupport(leftKeys)
&& UnsafeProjection.canSupport(rightKeys)
&& UnsafeProjection.canSupport(left.schema)
&& UnsafeProjection.canSupport(right.schema))
UnsafeProjection.canSupport(leftKeys) &&
UnsafeProjection.canSupport(rightKeys) &&
UnsafeProjection.canSupport(left.schema) &&
UnsafeProjection.canSupport(right.schema)
}

override def outputsUnsafeRows: Boolean = supportUnsafe
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ case class SortMergeJoin(
requiredOrders(leftKeys) :: requiredOrders(rightKeys) :: Nil

protected[this] def isUnsafeMode: Boolean = {
(codegenEnabled && unsafeEnabled
&& UnsafeProjection.canSupport(leftKeys)
&& UnsafeProjection.canSupport(rightKeys)
&& UnsafeProjection.canSupport(schema))
UnsafeProjection.canSupport(leftKeys) &&
UnsafeProjection.canSupport(rightKeys) &&
UnsafeProjection.canSupport(schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,9 @@ case class SortMergeOuterJoin(
}

private def isUnsafeMode: Boolean = {
(codegenEnabled && unsafeEnabled
&& UnsafeProjection.canSupport(leftKeys)
&& UnsafeProjection.canSupport(rightKeys)
&& UnsafeProjection.canSupport(schema))
UnsafeProjection.canSupport(leftKeys) &&
UnsafeProjection.canSupport(rightKeys) &&
UnsafeProjection.canSupport(schema)
}

override def outputsUnsafeRows: Boolean = isUnsafeMode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,7 @@ trait HashJoinNode {
private[this] var joinKeys: Projection = _

protected def isUnsafeMode: Boolean = {
(codegenEnabled &&
unsafeEnabled &&
UnsafeProjection.canSupport(schema) &&
UnsafeProjection.canSupport(streamedKeys))
UnsafeProjection.canSupport(schema) && UnsafeProjection.canSupport(streamedKeys)
}

private def streamSideKeyGenerator: Projection = {
Expand Down
Loading

0 comments on commit df97df2

Please sign in to comment.