Skip to content

Commit

Permalink
[SPARK-25358][SQL] MutableProjection supports fallback to an interpre…
Browse files Browse the repository at this point in the history
…ted mode

## What changes were proposed in this pull request?
In SPARK-23711, `UnsafeProjection` supports fallback to an interpreted mode. Therefore, this pr fixed code to support the same fallback mode in `MutableProjection` based on `CodeGeneratorWithInterpretedFallback`.

## How was this patch tested?
Added tests in `CodeGeneratorWithInterpretedFallbackSuite`.

Closes #22355 from maropu/SPARK-25358.

Authored-by: Takeshi Yamamuro <yamamuro@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
maropu authored and cloud-fan committed Sep 19, 2018
1 parent 5534a3a commit 12b1e91
Show file tree
Hide file tree
Showing 11 changed files with 201 additions and 87 deletions.
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp


/**
* A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
* expressions.
*
* @param expressions a sequence of expressions that determine the value of each column of the
* output row.
*/
class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(toBoundExprs(expressions, inputSchema))

private[this] val buffer = new Array[Any](expressions.size)

override def initialize(partitionIndex: Int): Unit = {
expressions.foreach(_.foreach {
case n: Nondeterministic => n.initialize(partitionIndex)
case _ =>
})
}

private[this] val validExprs = expressions.zipWithIndex.filter {
case (NoOp, _) => false
case _ => true
}
private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size)
def currentValue: InternalRow = mutableRow

override def target(row: InternalRow): MutableProjection = {
mutableRow = row
this
}

override def apply(input: InternalRow): InternalRow = {
var i = 0
while (i < validExprs.length) {
val (expr, ordinal) = validExprs(i)
// Store the result into buffer first, to make the projection atomic (needed by aggregation)
buffer(ordinal) = expr.eval(input)
i += 1
}
i = 0
while (i < validExprs.length) {
val (_, ordinal) = validExprs(i)
mutableRow(ordinal) = buffer(ordinal)
i += 1
}
mutableRow
}
}

/**
* Helper functions for creating an [[InterpretedMutableProjection]].
*/
object InterpretedMutableProjection {

/**
* Returns a [[MutableProjection]] for given sequence of bound Expressions.
*/
def createProjection(exprs: Seq[Expression]): MutableProjection = {
// We need to make sure that we do not reuse stateful expressions.
val cleanedExpressions = exprs.map(_.transform {
case s: Stateful => s.freshCopy()
})
new InterpretedMutableProjection(cleanedExpressions)
}
}
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateSafeProjection, GenerateUnsafeProjection}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -56,47 +56,50 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
}

/**
* A [[MutableProjection]] that is calculated by calling `eval` on each of the specified
* expressions.
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
* column of the new row. If the schema of the input row is specified, then the given expression
* will be bound to that schema.
*
* @param expressions a sequence of expressions that determine the value of each column of the
* output row.
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
* each time an input row is added. This significantly reduces the cost of calculating the
* projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
* `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
* `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
*/
case class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))
abstract class MutableProjection extends Projection {
def currentValue: InternalRow

private[this] val buffer = new Array[Any](expressions.size)
/** Uses the given row to store the output of the projection. */
def target(row: InternalRow): MutableProjection
}

override def initialize(partitionIndex: Int): Unit = {
expressions.foreach(_.foreach {
case n: Nondeterministic => n.initialize(partitionIndex)
case _ =>
})
/**
* The factory object for `MutableProjection`.
*/
object MutableProjection
extends CodeGeneratorWithInterpretedFallback[Seq[Expression], MutableProjection] {

override protected def createCodeGeneratedObject(in: Seq[Expression]): MutableProjection = {
GenerateMutableProjection.generate(in, SQLConf.get.subexpressionEliminationEnabled)
}

private[this] val exprArray = expressions.toArray
private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
def currentValue: InternalRow = mutableRow
override protected def createInterpretedObject(in: Seq[Expression]): MutableProjection = {
InterpretedMutableProjection.createProjection(in)
}

override def target(row: InternalRow): MutableProjection = {
mutableRow = row
this
/**
* Returns an MutableProjection for given sequence of bound Expressions.
*/
def create(exprs: Seq[Expression]): MutableProjection = {
createObject(exprs)
}

override def apply(input: InternalRow): InternalRow = {
var i = 0
while (i < exprArray.length) {
// Store the result into buffer first, to make the projection atomic (needed by aggregation)
buffer(i) = exprArray(i).eval(input)
i += 1
}
i = 0
while (i < exprArray.length) {
mutableRow(i) = buffer(i)
i += 1
}
mutableRow
/**
* Returns an MutableProjection for given sequence of Expressions, which will be bound to
* `inputSchema`.
*/
def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): MutableProjection = {
create(toBoundExprs(exprs, inputSchema))
}
}

Expand All @@ -123,12 +126,6 @@ object UnsafeProjection
InterpretedUnsafeProjection.createProjection(in)
}

protected def toBoundExprs(
exprs: Seq[Expression],
inputSchema: Seq[Attribute]): Seq[Expression] = {
exprs.map(BindReferences.bindReference(_, inputSchema))
}

protected def toUnsafeExprs(exprs: Seq[Expression]): Seq[Expression] = {
exprs.map(_ transform {
case CreateNamedStruct(children) => CreateNamedStructUnsafe(children)
Expand Down
Expand Up @@ -44,6 +44,10 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
create(canonicalize(bind(expressions, inputSchema)), useSubexprElimination)
}

def generate(expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = {
create(canonicalize(expressions), useSubexprElimination)
}

protected def create(expressions: Seq[Expression]): MutableProjection = {
create(expressions, false)
}
Expand Down
Expand Up @@ -86,24 +86,12 @@ package object expressions {
}

/**
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
* column of the new row. If the schema of the input row is specified, then the given expression
* will be bound to that schema.
*
* In contrast to a normal projection, a MutableProjection reuses the same underlying row object
* each time an input row is added. This significantly reduces the cost of calculating the
* projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after
* `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call
* `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`.
* A helper function to bind given expressions to an input schema.
*/
abstract class MutableProjection extends Projection {
def currentValue: InternalRow

/** Uses the given row to store the output of the projection. */
def target(row: InternalRow): MutableProjection
def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = {
exprs.map(BindReferences.bindReference(_, inputSchema))
}


/**
* Helper functions for working with `Seq[Attribute]`.
*/
Expand Down
Expand Up @@ -20,13 +20,18 @@ package org.apache.spark.sql.catalyst.expressions
import java.util.concurrent.ExecutionException

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
import org.apache.spark.sql.catalyst.plans.PlanTestBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.{IntegerType, StructType}

class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanTestBase {

val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString

object FailedCodegenProjection
extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] {

Expand All @@ -44,19 +49,30 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT

test("UnsafeProjection with codegen factory mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
val obj = UnsafeProjection.createObject(input)
assert(obj.getClass.getName.contains("GeneratedClass$SpecificUnsafeProjection"))
}

val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
val obj = UnsafeProjection.createObject(input)
assert(obj.isInstanceOf[InterpretedUnsafeProjection])
}
}

test("MutableProjection with codegen factory mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
val obj = MutableProjection.createObject(input)
assert(obj.getClass.getName.contains("GeneratedClass$SpecificMutableProjection"))
}

withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
val obj = MutableProjection.createObject(input)
assert(obj.isInstanceOf[InterpretedMutableProjection])
}
}

test("fallback to the interpreter mode") {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val fallback = CodegenObjectFactoryMode.FALLBACK.toString
Expand All @@ -69,11 +85,25 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT
test("codegen failures in the CODEGEN_ONLY mode") {
val errMsg = intercept[ExecutionException] {
val input = Seq(BoundReference(0, IntegerType, nullable = true))
val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
FailedCodegenProjection.createObject(input)
}
}.getMessage
assert(errMsg.contains("failed to compile: org.codehaus.commons.compiler.CompileException:"))
}

test("SPARK-25358 Correctly handles NoOp in MutableProjection") {
val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp)
val input = InternalRow.fromSeq(1 :: 1 :: Nil)
val expected = 2 :: null :: Nil
withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) {
val proj = MutableProjection.createObject(exprs)
assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected)
}

withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) {
val proj = MutableProjection.createObject(exprs)
assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected)
}
}
}
Expand Up @@ -1510,16 +1510,16 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper
val seed1 = Some(r.nextLong())
assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) ===
evaluateWithoutCodegen(Shuffle(ai0, seed1)))
assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) ===
evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)))
assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) ===
evaluateWithMutableProjection(Shuffle(ai0, seed1)))
assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) ===
evaluateWithUnsafeProjection(Shuffle(ai0, seed1)))

val seed2 = Some(r.nextLong())
assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) !==
evaluateWithoutCodegen(Shuffle(ai0, seed2)))
assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) !==
evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed2)))
assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) !==
evaluateWithMutableProjection(Shuffle(ai0, seed2)))
assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))

Expand Down

0 comments on commit 12b1e91

Please sign in to comment.