From 76ca37075398f890d19775e0037cb92072af53d2 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 6 Sep 2018 23:48:04 +0900 Subject: [PATCH 1/2] Fix --- .../InterpretedMutableProjection.scala | 89 +++++++++++++++++++ .../sql/catalyst/expressions/Projection.scala | 75 ++++++++-------- .../codegen/GenerateMutableProjection.scala | 4 + .../sql/catalyst/expressions/package.scala | 18 +--- ...eneratorWithInterpretedFallbackSuite.scala | 39 +++++++- .../CollectionExpressionsSuite.scala | 8 +- .../expressions/ExpressionEvalHelper.scala | 34 ++++--- .../expressions/MiscExpressionsSuite.scala | 10 +-- .../expressions/ObjectExpressionsSuite.scala | 8 +- .../spark/sql/execution/SparkPlan.scala | 2 +- .../spark/sql/execution/aggregate/udaf.scala | 2 +- 11 files changed, 202 insertions(+), 87 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala new file mode 100644 index 0000000000000..0654108cea281 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InterpretedMutableProjection.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp + + +/** + * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified + * expressions. + * + * @param expressions a sequence of expressions that determine the value of each column of the + * output row. + */ +class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { + def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = + this(toBoundExprs(expressions, inputSchema)) + + private[this] val buffer = new Array[Any](expressions.size) + + override def initialize(partitionIndex: Int): Unit = { + expressions.foreach(_.foreach { + case n: Nondeterministic => n.initialize(partitionIndex) + case _ => + }) + } + + private[this] val validExprs = expressions.zipWithIndex.filter { + case (NoOp, _) => false + case _ => true + } + private[this] var mutableRow: InternalRow = new GenericInternalRow(expressions.size) + def currentValue: InternalRow = mutableRow + + override def target(row: InternalRow): MutableProjection = { + mutableRow = row + this + } + + override def apply(input: InternalRow): InternalRow = { + var i = 0 + while (i < validExprs.length) { + val (expr, ordinal) = validExprs(i) + // Store the result into buffer first, to make the projection atomic (needed by aggregation) + buffer(ordinal) = expr.eval(input) + i += 1 + } + i = 0 + while (i < validExprs.length) { + val (_, ordinal) = validExprs(i) + mutableRow(ordinal) = buffer(ordinal) + i += 1 + } + mutableRow + } +} + +/** + * Helper functions for creating an [[InterpretedMutableProjection]]. + */ +object InterpretedMutableProjection { + + /** + * Returns a [[MutableProjection]] for given sequence of bound Expressions. + */ + def createProjection(exprs: Seq[Expression]): MutableProjection = { + // We need to make sure that we do not reuse stateful expressions. + val cleanedExpressions = exprs.map(_.transform { + case s: Stateful => s.freshCopy() + }) + new InterpretedMutableProjection(cleanedExpressions) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala index 5f24170398715..792646cf9f10c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection, GenerateUnsafeProjection} +import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateMutableProjection, GenerateSafeProjection, GenerateUnsafeProjection} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} @@ -56,47 +56,50 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection { } /** - * A [[MutableProjection]] that is calculated by calling `eval` on each of the specified - * expressions. + * Converts a [[InternalRow]] to another Row given a sequence of expression that define each + * column of the new row. If the schema of the input row is specified, then the given expression + * will be bound to that schema. * - * @param expressions a sequence of expressions that determine the value of each column of the - * output row. + * In contrast to a normal projection, a MutableProjection reuses the same underlying row object + * each time an input row is added. This significantly reduces the cost of calculating the + * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after + * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call + * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`. */ -case class InterpretedMutableProjection(expressions: Seq[Expression]) extends MutableProjection { - def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) = - this(expressions.map(BindReferences.bindReference(_, inputSchema))) +abstract class MutableProjection extends Projection { + def currentValue: InternalRow - private[this] val buffer = new Array[Any](expressions.size) + /** Uses the given row to store the output of the projection. */ + def target(row: InternalRow): MutableProjection +} - override def initialize(partitionIndex: Int): Unit = { - expressions.foreach(_.foreach { - case n: Nondeterministic => n.initialize(partitionIndex) - case _ => - }) +/** + * The factory object for `MutableProjection`. + */ +object MutableProjection + extends CodeGeneratorWithInterpretedFallback[Seq[Expression], MutableProjection] { + + override protected def createCodeGeneratedObject(in: Seq[Expression]): MutableProjection = { + GenerateMutableProjection.generate(in, SQLConf.get.subexpressionEliminationEnabled) } - private[this] val exprArray = expressions.toArray - private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length) - def currentValue: InternalRow = mutableRow + override protected def createInterpretedObject(in: Seq[Expression]): MutableProjection = { + InterpretedMutableProjection.createProjection(in) + } - override def target(row: InternalRow): MutableProjection = { - mutableRow = row - this + /** + * Returns an MutableProjection for given sequence of bound Expressions. + */ + def create(exprs: Seq[Expression]): MutableProjection = { + createObject(exprs) } - override def apply(input: InternalRow): InternalRow = { - var i = 0 - while (i < exprArray.length) { - // Store the result into buffer first, to make the projection atomic (needed by aggregation) - buffer(i) = exprArray(i).eval(input) - i += 1 - } - i = 0 - while (i < exprArray.length) { - mutableRow(i) = buffer(i) - i += 1 - } - mutableRow + /** + * Returns an MutableProjection for given sequence of Expressions, which will be bound to + * `inputSchema`. + */ + def create(exprs: Seq[Expression], inputSchema: Seq[Attribute]): MutableProjection = { + create(toBoundExprs(exprs, inputSchema)) } } @@ -123,12 +126,6 @@ object UnsafeProjection InterpretedUnsafeProjection.createProjection(in) } - protected def toBoundExprs( - exprs: Seq[Expression], - inputSchema: Seq[Attribute]): Seq[Expression] = { - exprs.map(BindReferences.bindReference(_, inputSchema)) - } - protected def toUnsafeExprs(exprs: Seq[Expression]): Seq[Expression] = { exprs.map(_ transform { case CreateNamedStruct(children) => CreateNamedStructUnsafe(children) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 33d14329ec95c..d588e7f081303 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -44,6 +44,10 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP create(canonicalize(bind(expressions, inputSchema)), useSubexprElimination) } + def generate(expressions: Seq[Expression], useSubexprElimination: Boolean): MutableProjection = { + create(canonicalize(expressions), useSubexprElimination) + } + protected def create(expressions: Seq[Expression]): MutableProjection = { create(expressions, false) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index 11dcc3ebf798c..0083ee64653e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -86,24 +86,12 @@ package object expressions { } /** - * Converts a [[InternalRow]] to another Row given a sequence of expression that define each - * column of the new row. If the schema of the input row is specified, then the given expression - * will be bound to that schema. - * - * In contrast to a normal projection, a MutableProjection reuses the same underlying row object - * each time an input row is added. This significantly reduces the cost of calculating the - * projection, but means that it is not safe to hold on to a reference to a [[InternalRow]] after - * `next()` has been called on the [[Iterator]] that produced it. Instead, the user must call - * `InternalRow.copy()` and hold on to the returned [[InternalRow]] before calling `next()`. + * A helper function to bind given expressions to an input schema. */ - abstract class MutableProjection extends Projection { - def currentValue: InternalRow - - /** Uses the given row to store the output of the projection. */ - def target(row: InternalRow): MutableProjection + def toBoundExprs(exprs: Seq[Expression], inputSchema: Seq[Attribute]): Seq[Expression] = { + exprs.map(BindReferences.bindReference(_, inputSchema)) } - /** * Helper functions for working with `Seq[Attribute]`. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala index 28edd85ab6e87..ec5a2f8f7ab02 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala @@ -20,13 +20,18 @@ package org.apache.spark.sql.catalyst.expressions import java.util.concurrent.ExecutionException import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator} import org.apache.spark.sql.catalyst.plans.PlanTestBase import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.types.{IntegerType, StructType} class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanTestBase { + val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString + val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString + object FailedCodegenProjection extends CodeGeneratorWithInterpretedFallback[Seq[Expression], UnsafeProjection] { @@ -44,19 +49,30 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT test("UnsafeProjection with codegen factory mode") { val input = Seq(BoundReference(0, IntegerType, nullable = true)) - val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { val obj = UnsafeProjection.createObject(input) assert(obj.getClass.getName.contains("GeneratedClass$SpecificUnsafeProjection")) } - val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) { val obj = UnsafeProjection.createObject(input) assert(obj.isInstanceOf[InterpretedUnsafeProjection]) } } + test("MutableProjection with codegen factory mode") { + val input = Seq(BoundReference(0, IntegerType, nullable = true)) + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { + val obj = MutableProjection.createObject(input) + assert(obj.getClass.getName.contains("GeneratedClass$SpecificMutableProjection")) + } + + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) { + val obj = MutableProjection.createObject(input) + assert(obj.isInstanceOf[InterpretedMutableProjection]) + } + } + test("fallback to the interpreter mode") { val input = Seq(BoundReference(0, IntegerType, nullable = true)) val fallback = CodegenObjectFactoryMode.FALLBACK.toString @@ -69,11 +85,26 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT test("codegen failures in the CODEGEN_ONLY mode") { val errMsg = intercept[ExecutionException] { val input = Seq(BoundReference(0, IntegerType, nullable = true)) - val codegenOnly = CodegenObjectFactoryMode.CODEGEN_ONLY.toString withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { FailedCodegenProjection.createObject(input) } + val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString }.getMessage assert(errMsg.contains("failed to compile: org.codehaus.commons.compiler.CompileException:")) } + + test("SPARK-25358 Correctly handles NoOp in MutableProjection") { + val exprs = Seq(Add(BoundReference(0, IntegerType, nullable = true), Literal.create(1)), NoOp) + val input = InternalRow.fromSeq(1 :: 1 :: Nil) + val expected = 2 :: null :: Nil + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { + val proj = MutableProjection.createObject(exprs) + assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected) + } + + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> noCodegen) { + val proj = MutableProjection.createObject(exprs) + assert(proj(input).toSeq(StructType.fromDDL("c0 int, c1 int")) === expected) + } + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala index c7db4ec9e16b1..2e0adbb465008 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala @@ -1510,16 +1510,16 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper val seed1 = Some(r.nextLong()) assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) === evaluateWithoutCodegen(Shuffle(ai0, seed1))) - assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) === - evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1))) + assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) === + evaluateWithMutableProjection(Shuffle(ai0, seed1))) assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) === evaluateWithUnsafeProjection(Shuffle(ai0, seed1))) val seed2 = Some(r.nextLong()) assert(evaluateWithoutCodegen(Shuffle(ai0, seed1)) !== evaluateWithoutCodegen(Shuffle(ai0, seed2))) - assert(evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed1)) !== - evaluateWithGeneratedMutableProjection(Shuffle(ai0, seed2))) + assert(evaluateWithMutableProjection(Shuffle(ai0, seed1)) !== + evaluateWithMutableProjection(Shuffle(ai0, seed2))) assert(evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 6684e5ce18d4c..b5986aac65552 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -60,7 +60,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa def expr = prepareEvaluation(expression) val catalystValue = CatalystTypeConverters.convertToCatalyst(expected) checkEvaluationWithoutCodegen(expr, catalystValue, inputRow) - checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow) + checkEvaluationWithMutableProjection(expr, catalystValue, inputRow) if (GenerateUnsafeProjection.canSupport(expr.dataType)) { checkEvaluationWithUnsafeProjection(expr, catalystValue, inputRow) } @@ -136,7 +136,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa // Make it as method to obtain fresh expression everytime. def expr = prepareEvaluation(expression) checkException(evaluateWithoutCodegen(expr, inputRow), "non-codegen mode") - checkException(evaluateWithGeneratedMutableProjection(expr, inputRow), "codegen mode") + checkException(evaluateWithMutableProjection(expr, inputRow), "codegen mode") if (GenerateUnsafeProjection.canSupport(expr.dataType)) { checkException(evaluateWithUnsafeProjection(expr, inputRow), "unsafe mode") } @@ -183,22 +183,28 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa } } - protected def checkEvaluationWithGeneratedMutableProjection( - expression: Expression, + protected def checkEvaluationWithMutableProjection( + expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = { - val actual = evaluateWithGeneratedMutableProjection(expression, inputRow) - if (!checkResult(actual, expected, expression.dataType)) { - val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" - fail(s"Incorrect evaluation: $expression, actual: $actual, expected: $expected$input") + val modes = Seq(CodegenObjectFactoryMode.CODEGEN_ONLY, CodegenObjectFactoryMode.NO_CODEGEN) + for (fallbackMode <- modes) { + withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> fallbackMode.toString) { + val actual = evaluateWithMutableProjection(expression, inputRow) + if (!checkResult(actual, expected, expression.dataType)) { + val input = if (inputRow == EmptyRow) "" else s", input: $inputRow" + fail(s"Incorrect evaluation (fallback mode = $fallbackMode): $expression, " + + s"actual: $actual, expected: $expected$input") + } + } } } - protected def evaluateWithGeneratedMutableProjection( - expression: Expression, + protected def evaluateWithMutableProjection( + expression: => Expression, inputRow: InternalRow = EmptyRow): Any = { val plan = generateProject( - GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil), + MutableProjection.create(Alias(expression, s"Optimized($expression)")() :: Nil), expression) plan.initialize(0) @@ -218,7 +224,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa if (expected == null) { if (!unsafeRow.isNullAt(0)) { val expectedRow = InternalRow(expected, expected) - fail("Incorrect evaluation in unsafe mode: " + + fail(s"Incorrect evaluation in unsafe mode (fallback mode = $fallbackMode): " + s"$expression, actual: $unsafeRow, expected: $expectedRow$input") } } else { @@ -226,7 +232,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa val expectedRow = UnsafeProjection.create(Array(expression.dataType, expression.dataType)).apply(lit) if (unsafeRow != expectedRow) { - fail("Incorrect evaluation in unsafe mode: " + + fail(s"Incorrect evaluation in unsafe mode (fallback mode = $fallbackMode): " + s"$expression, actual: $unsafeRow, expected: $expectedRow$input") } } @@ -266,7 +272,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks with PlanTestBa expected: Spread[Double], inputRow: InternalRow = EmptyRow): Unit = { checkEvaluationWithoutCodegen(expression, expected) - checkEvaluationWithGeneratedMutableProjection(expression, expected) + checkEvaluationWithMutableProjection(expression, expected) checkEvaluationWithOptimization(expression, expected) var plan = generateProject( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala index b6c269348b002..4b2d153a28cc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/MiscExpressionsSuite.scala @@ -48,15 +48,15 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val r = new Random() val seed1 = Some(r.nextLong()) assert(evaluateWithoutCodegen(Uuid(seed1)) === evaluateWithoutCodegen(Uuid(seed1))) - assert(evaluateWithGeneratedMutableProjection(Uuid(seed1)) === - evaluateWithGeneratedMutableProjection(Uuid(seed1))) + assert(evaluateWithMutableProjection(Uuid(seed1)) === + evaluateWithMutableProjection(Uuid(seed1))) assert(evaluateWithUnsafeProjection(Uuid(seed1)) === evaluateWithUnsafeProjection(Uuid(seed1))) val seed2 = Some(r.nextLong()) assert(evaluateWithoutCodegen(Uuid(seed1)) !== evaluateWithoutCodegen(Uuid(seed2))) - assert(evaluateWithGeneratedMutableProjection(Uuid(seed1)) !== - evaluateWithGeneratedMutableProjection(Uuid(seed2))) + assert(evaluateWithMutableProjection(Uuid(seed1)) !== + evaluateWithMutableProjection(Uuid(seed2))) assert(evaluateWithUnsafeProjection(Uuid(seed1)) !== evaluateWithUnsafeProjection(Uuid(seed2))) @@ -79,7 +79,7 @@ class MiscExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val outputEval = errorStream.toString errorStream.reset() // check with codegen - checkEvaluationWithGeneratedMutableProjection(PrintToStderr(inputExpr), 1) + checkEvaluationWithMutableProjection(PrintToStderr(inputExpr), 1) val outputCodegen = errorStream.toString (outputEval, outputCodegen) } finally { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala index b0af9e07d1d1d..d145fd0aaba47 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ObjectExpressionsSuite.scala @@ -72,7 +72,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val cls = classOf[Tuple2[Boolean, java.lang.Integer]] val inputObject = BoundReference(0, ObjectType(cls), nullable = true) val invoke = Invoke(inputObject, "_2", IntegerType) - checkEvaluationWithGeneratedMutableProjection(invoke, null, inputRow) + checkEvaluationWithMutableProjection(invoke, null, inputRow) } test("MapObjects should make copies of unsafe-backed data") { @@ -233,13 +233,13 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { Literal.fromObject(new TestBean), Map("setNonPrimitive" -> Literal(null))) evaluateWithoutCodegen(initializeBean, InternalRow.fromSeq(Seq())) - evaluateWithGeneratedMutableProjection(initializeBean, InternalRow.fromSeq(Seq())) + evaluateWithMutableProjection(initializeBean, InternalRow.fromSeq(Seq())) val initializeBean2 = InitializeJavaBean( Literal.fromObject(new TestBean), Map("setNonPrimitive" -> Literal("string"))) evaluateWithoutCodegen(initializeBean2, InternalRow.fromSeq(Seq())) - evaluateWithGeneratedMutableProjection(initializeBean2, InternalRow.fromSeq(Seq())) + evaluateWithMutableProjection(initializeBean2, InternalRow.fromSeq(Seq())) } test("SPARK-23585: UnwrapOption should support interpreted execution") { @@ -273,7 +273,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { val resolver = ResolveTimeZone(new SQLConf) val expr = resolver.resolveTimeZones(serializer.deserialize(serializer.serialize(expression))) checkEvaluationWithoutCodegen(expr, expected, inputRow) - checkEvaluationWithGeneratedMutableProjection(expr, expected, inputRow) + checkEvaluationWithMutableProjection(expr, expected, inputRow) if (GenerateUnsafeProjection.canSupport(expr.dataType)) { checkEvaluationWithUnsafeProjection( expr, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 1f97993e20458..ab6031c436e9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -380,7 +380,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ inputSchema: Seq[Attribute], useSubexprElimination: Boolean = false): MutableProjection = { log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema") - GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination) + MutableProjection.create(expressions, inputSchema) } private def genInterpretedPredicate( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala index 72aa4adff4e64..100486fa9850f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala @@ -365,7 +365,7 @@ case class ScalaUDAF( val inputAttributes = childrenSchema.toAttributes log.debug( s"Creating MutableProj: $children, inputSchema: $inputAttributes.") - GenerateMutableProjection.generate(children, inputAttributes) + MutableProjection.create(children, inputAttributes) } private[this] lazy val inputToScalaConverters: Any => Any = From 87491732e1473880077daf9ce5c080845f4742e1 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 17 Sep 2018 21:03:30 +0900 Subject: [PATCH 2/2] Fix --- .../expressions/CodeGeneratorWithInterpretedFallbackSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala index ec5a2f8f7ab02..6ea3b05ff9c1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGeneratorWithInterpretedFallbackSuite.scala @@ -88,7 +88,6 @@ class CodeGeneratorWithInterpretedFallbackSuite extends SparkFunSuite with PlanT withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenOnly) { FailedCodegenProjection.createObject(input) } - val noCodegen = CodegenObjectFactoryMode.NO_CODEGEN.toString }.getMessage assert(errMsg.contains("failed to compile: org.codehaus.commons.compiler.CompileException:")) }