From 0bcbe43e6d5370aee92d36429b7b45ee9f6568e4 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 17 Aug 2017 18:28:54 +0100 Subject: [PATCH 1/5] Initial commit --- .../expressions/codegen/CodeGenerator.scala | 30 +++++++ .../expressions/conditionalExpressions.scala | 29 +------ .../sql/catalyst/expressions/predicates.scala | 82 ++++++++++++++++++- .../expressions/CodeGenerationSuite.scala | 39 +++++++++ 4 files changed, 152 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 2cb66599076a9..7873ab84da1cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -808,6 +808,36 @@ class CodegenContext { } } + /** + * Wrap the generated code of expression by a function. ev,isNull and ev.value are passed + * by global variables + * + * @param ev the code to evaluate expressions. + * @param dataType the data type of ev.value. + * @param baseFuncName the split function name base. + */ + def createAndAddFunction( + ev: ExprCode, + dataType: DataType, + baseFuncName: String): (String, String, String) = { + val globalIsNull = freshName("isNull") + addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") + val globalValue = freshName("value") + addMutableState(javaType(dataType), globalValue, + s"$globalValue = ${defaultValue(dataType)};") + val funcName = freshName(baseFuncName) + val funcBody = + s""" + |private void $funcName(InternalRow ${INPUT_ROW}) { + | ${ev.code.trim} + | $globalIsNull = ${ev.isNull}; + | $globalValue = ${ev.value}; + |} + """.stripMargin + val fullFuncName = addNewFunction(funcName, funcBody) + (fullFuncName, globalIsNull, globalValue) + } + /** * Perform a function which generates a sequence of ExprCodes with a given mapping between * expressions and common expressions, instead of using the mapping in current context. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala index d95b59d5ec423..c41a10c7b0f87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala @@ -72,11 +72,11 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi (ctx.INPUT_ROW != null && ctx.currentVars == null)) { val (condFuncName, condGlobalIsNull, condGlobalValue) = - createAndAddFunction(ctx, condEval, predicate.dataType, "evalIfCondExpr") + ctx.createAndAddFunction(condEval, predicate.dataType, "evalIfCondExpr") val (trueFuncName, trueGlobalIsNull, trueGlobalValue) = - createAndAddFunction(ctx, trueEval, trueValue.dataType, "evalIfTrueExpr") + ctx.createAndAddFunction(trueEval, trueValue.dataType, "evalIfTrueExpr") val (falseFuncName, falseGlobalIsNull, falseGlobalValue) = - createAndAddFunction(ctx, falseEval, falseValue.dataType, "evalIfFalseExpr") + ctx.createAndAddFunction(falseEval, falseValue.dataType, "evalIfFalseExpr") s""" $condFuncName(${ctx.INPUT_ROW}); boolean ${ev.isNull} = false; @@ -112,29 +112,6 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi ev.copy(code = generatedCode) } - private def createAndAddFunction( - ctx: CodegenContext, - ev: ExprCode, - dataType: DataType, - baseFuncName: String): (String, String, String) = { - val globalIsNull = ctx.freshName("isNull") - ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;") - val globalValue = ctx.freshName("value") - ctx.addMutableState(ctx.javaType(dataType), globalValue, - s"$globalValue = ${ctx.defaultValue(dataType)};") - val funcName = ctx.freshName(baseFuncName) - val funcBody = - s""" - |private void $funcName(InternalRow ${ctx.INPUT_ROW}) { - | ${ev.code.trim} - | $globalIsNull = ${ev.isNull}; - | $globalValue = ${ev.value}; - |} - """.stripMargin - val fullFuncName = ctx.addNewFunction(funcName, funcBody) - (fullFuncName, globalIsNull, globalValue) - } - override def toString: String = s"if ($predicate) $trueValue else $falseValue" override def sql: String = s"(IF(${predicate.sql}, ${trueValue.sql}, ${falseValue.sql}))" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala index efcd45fad779c..61df5e053a374 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala @@ -368,7 +368,46 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with val eval2 = right.genCode(ctx) // The result should be `false`, if any of them is `false` whenever the other is null or not. - if (!left.nullable && !right.nullable) { + + // place generated code of eval1 and eval2 in separate methods if their code combined is large + val combinedLength = eval1.code.length + eval2.code.length + if (combinedLength > 1024 && + // Split these expressions only if they are created from a row object + (ctx.INPUT_ROW != null && ctx.currentVars == null)) { + + val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) = + ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr") + val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) = + ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr") + if (!left.nullable && !right.nullable) { + val generatedCode = s""" + $eval1FuncName(${ctx.INPUT_ROW}); + boolean ${ev.value} = false; + if (${eval1GlobalValue}) { + $eval2FuncName(${ctx.INPUT_ROW}); + ${ev.value} = ${eval2GlobalValue}; + } + """ + ev.copy(code = generatedCode, isNull = "false") + } else { + val generatedCode = s""" + $eval1FuncName(${ctx.INPUT_ROW}); + boolean ${ev.isNull} = false; + boolean ${ev.value} = false; + if (!${eval1GlobalIsNull} && !${eval1GlobalValue}) { + } else { + $eval2FuncName(${ctx.INPUT_ROW}); + if (!${eval2GlobalIsNull} && !${eval2GlobalValue}) { + } else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) { + ${ev.value} = true; + } else { + ${ev.isNull} = true; + } + } + """ + ev.copy(code = generatedCode) + } + } else if (!left.nullable && !right.nullable) { ev.copy(code = s""" ${eval1.code} boolean ${ev.value} = false; @@ -431,7 +470,46 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P val eval2 = right.genCode(ctx) // The result should be `true`, if any of them is `true` whenever the other is null or not. - if (!left.nullable && !right.nullable) { + + // place generated code of eval1 and eval2 in separate methods if their code combined is large + val combinedLength = eval1.code.length + eval2.code.length + if (combinedLength > 1024 && + // Split these expressions only if they are created from a row object + (ctx.INPUT_ROW != null && ctx.currentVars == null)) { + + val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) = + ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr") + val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) = + ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr") + if (!left.nullable && !right.nullable) { + val generatedCode = s""" + $eval1FuncName(${ctx.INPUT_ROW}); + boolean ${ev.value} = true; + if (!${eval1GlobalValue}) { + $eval2FuncName(${ctx.INPUT_ROW}); + ${ev.value} = ${eval2GlobalValue}; + } + """ + ev.copy(code = generatedCode, isNull = "false") + } else { + val generatedCode = s""" + $eval1FuncName(${ctx.INPUT_ROW}); + boolean ${ev.isNull} = false; + boolean ${ev.value} = true; + if (!${eval1GlobalIsNull} && ${eval1GlobalValue}) { + } else { + $eval2FuncName(${ctx.INPUT_ROW}); + if (!${eval2GlobalIsNull} && ${eval2GlobalValue}) { + } else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) { + ${ev.value} = false; + } else { + ${ev.isNull} = true; + } + } + """ + ev.copy(code = generatedCode) + } + } else if (!left.nullable && !right.nullable) { ev.isNull = "false" ev.copy(code = s""" ${eval1.code} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 7ea0bec145481..368f8e1b723f7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -324,4 +324,43 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { // should not throw exception projection(row) } + + test("SPARK-21720: split large predications into blocks due to JVM code size limit") { + val length = 600 + + val input = new GenericInternalRow(length) + val utf8Str = UTF8String.fromString(s"abc") + for (i <- 0 until length) { + input.update(i, utf8Str) + } + + var exprOr: Expression = Literal(false) + for (i <- 0 until length) { + exprOr = Or(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprOr) + } + + val planOr = GenerateMutableProjection.generate(Seq(exprOr)) + val actualOr = planOr(input).toSeq(Seq(exprOr.dataType)) + assert(actualOr.length == 1) + val expectedOr = false + + if (!checkResult(actualOr.head, expectedOr, exprOr.dataType)) { + fail(s"Incorrect Evaluation: expressions: $exprOr, actual: $actualOr, expected: $expectedOr") + } + + var exprAnd: Expression = Literal(true) + for (i <- 0 until length) { + exprAnd = And(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprAnd) + } + + val planAnd = GenerateMutableProjection.generate(Seq(exprAnd)) + val actualAnd = planAnd(input).toSeq(Seq(exprAnd.dataType)) + assert(actualAnd.length == 1) + val expectedAnd = false + + if (!checkResult(actualAnd.head, expectedAnd, exprAnd.dataType)) { + fail( + s"Incorrect Evaluation: expressions: $exprAnd, actual: $actualAnd, expected: $expectedAnd") + } + } } From 36e0b76aea231542a75795777a2895e555c0e8fa Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 18 Aug 2017 13:34:41 +0100 Subject: [PATCH 2/5] address review comment --- .../sql/catalyst/expressions/codegen/CodeGenerator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 7873ab84da1cf..0ba50b6a1b013 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -809,8 +809,8 @@ class CodegenContext { } /** - * Wrap the generated code of expression by a function. ev,isNull and ev.value are passed - * by global variables + * Wrap the generated code of expression, which was created from a row object in INPUT_ROW, + * by a function. ev,isNull and ev.value are passed by global variables * * @param ev the code to evaluate expressions. * @param dataType the data type of ev.value. From c9bc39562a8be4ba2d1a430a3346649628b451be Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Thu, 12 Oct 2017 18:05:03 +0100 Subject: [PATCH 3/5] fix test failure of DataFrameSuite.SPARK-19372 --- .../test/scala/org/apache/spark/sql/DataFrameSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index ad461fa6144b3..497b7892e55f3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2067,7 +2067,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .count } - testQuietly("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { + test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { val N = 400 val rows = Seq(Row.fromSeq(Seq.fill(N)("string"))) val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType))) @@ -2081,10 +2081,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { - val e = intercept[SparkException] { - df.filter(filter).count() - }.getMessage - assert(e.contains("grows beyond 64 KB")) + // SPARK-21720 avoids an exception due to JVM code size limit + df.filter(filter).count() } } From 2f0655575de285aefafbda79cd05695ac1fa6154 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Fri, 10 Nov 2017 17:36:39 +0000 Subject: [PATCH 4/5] fix typo --- .../spark/sql/catalyst/expressions/codegen/CodeGenerator.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 0ba50b6a1b013..27fe210394264 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -810,7 +810,7 @@ class CodegenContext { /** * Wrap the generated code of expression, which was created from a row object in INPUT_ROW, - * by a function. ev,isNull and ev.value are passed by global variables + * by a function. ev.isNull and ev.value are passed by global variables * * @param ev the code to evaluate expressions. * @param dataType the data type of ev.value. From bf3549882982bcddd7d4d0b38c61d939c2c8a046 Mon Sep 17 00:00:00 2001 From: Kazuaki Ishizaki Date: Sun, 12 Nov 2017 11:47:09 +0000 Subject: [PATCH 5/5] address review comment --- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 497b7892e55f3..d806140b61337 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2067,7 +2067,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { .count } - test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { + // The fix of SPARK-21720 avoid an exception regarding JVM code size limit + // TODO: When we make a threshold of splitting statements (1024) configurable, + // we will re-enable this with max threshold to cause an exception + // See https://github.com/apache/spark/pull/18972/files#r150223463 + ignore("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") { val N = 400 val rows = Seq(Row.fromSeq(Seq.fill(N)("string"))) val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType))) @@ -2081,8 +2085,10 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") { - // SPARK-21720 avoids an exception due to JVM code size limit - df.filter(filter).count() + val e = intercept[SparkException] { + df.filter(filter).count() + }.getMessage + assert(e.contains("grows beyond 64 KB")) } }