Skip to content

Commit

Permalink
[SPARK-21720][SQL] Fix 64KB JVM bytecode limit problem with AND or OR
Browse files Browse the repository at this point in the history
This PR changes `AND` or `OR` code generation to place condition and then expressions' generated code into separated methods if these size could be large. When the method is newly generated, variables for `isNull` and `value` are declared as an instance variable to pass these values (e.g. `isNull1409` and `value1409`) to the callers of the generated method.

This PR resolved two cases:

* large code size of left expression
* large code size of right expression

Added a new test case into `CodeGenerationSuite`

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18972 from kiszk/SPARK-21720.

(cherry picked from commit 9bf696d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
kiszk authored and cloud-fan committed Nov 12, 2017
1 parent f6ee3d9 commit 114dc42
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,36 @@ class CodegenContext {
}
}

/**
* Wrap the generated code of expression, which was created from a row object in INPUT_ROW,
* by a function. ev.isNull and ev.value are passed by global variables
*
* @param ev the code to evaluate expressions.
* @param dataType the data type of ev.value.
* @param baseFuncName the split function name base.
*/
def createAndAddFunction(
ev: ExprCode,
dataType: DataType,
baseFuncName: String): (String, String, String) = {
val globalIsNull = freshName("isNull")
addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
val globalValue = freshName("value")
addMutableState(javaType(dataType), globalValue,
s"$globalValue = ${defaultValue(dataType)};")
val funcName = freshName(baseFuncName)
val funcBody =
s"""
|private void $funcName(InternalRow ${INPUT_ROW}) {
| ${ev.code.trim}
| $globalIsNull = ${ev.isNull};
| $globalValue = ${ev.value};
|}
""".stripMargin
addNewFunction(funcName, funcBody)
(funcName, globalIsNull, globalValue)
}

/**
* Perform a function which generates a sequence of ExprCodes with a given mapping between
* expressions and common expressions, instead of using the mapping in current context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (condFuncName, condGlobalIsNull, condGlobalValue) =
createAndAddFunction(ctx, condEval, predicate.dataType, "evalIfCondExpr")
ctx.createAndAddFunction(condEval, predicate.dataType, "evalIfCondExpr")
val (trueFuncName, trueGlobalIsNull, trueGlobalValue) =
createAndAddFunction(ctx, trueEval, trueValue.dataType, "evalIfTrueExpr")
ctx.createAndAddFunction(trueEval, trueValue.dataType, "evalIfTrueExpr")
val (falseFuncName, falseGlobalIsNull, falseGlobalValue) =
createAndAddFunction(ctx, falseEval, falseValue.dataType, "evalIfFalseExpr")
ctx.createAndAddFunction(falseEval, falseValue.dataType, "evalIfFalseExpr")
s"""
$condFuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
Expand Down Expand Up @@ -112,29 +112,6 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
ev.copy(code = generatedCode)
}

private def createAndAddFunction(
ctx: CodegenContext,
ev: ExprCode,
dataType: DataType,
baseFuncName: String): (String, String, String) = {
val globalIsNull = ctx.freshName("isNull")
ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
val globalValue = ctx.freshName("value")
ctx.addMutableState(ctx.javaType(dataType), globalValue,
s"$globalValue = ${ctx.defaultValue(dataType)};")
val funcName = ctx.freshName(baseFuncName)
val funcBody =
s"""
|private void $funcName(InternalRow ${ctx.INPUT_ROW}) {
| ${ev.code.trim}
| $globalIsNull = ${ev.isNull};
| $globalValue = ${ev.value};
|}
""".stripMargin
ctx.addNewFunction(funcName, funcBody)
(funcName, globalIsNull, globalValue)
}

override def toString: String = s"if ($predicate) $trueValue else $falseValue"

override def sql: String = s"(IF(${predicate.sql}, ${trueValue.sql}, ${falseValue.sql}))"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,46 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with
val eval2 = right.genCode(ctx)

// The result should be `false`, if any of them is `false` whenever the other is null or not.
if (!left.nullable && !right.nullable) {

// place generated code of eval1 and eval2 in separate methods if their code combined is large
val combinedLength = eval1.code.length + eval2.code.length
if (combinedLength > 1024 &&
// Split these expressions only if they are created from a row object
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
if (!left.nullable && !right.nullable) {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.value} = false;
if (${eval1GlobalValue}) {
$eval2FuncName(${ctx.INPUT_ROW});
${ev.value} = ${eval2GlobalValue};
}
"""
ev.copy(code = generatedCode, isNull = "false")
} else {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
boolean ${ev.value} = false;
if (!${eval1GlobalIsNull} && !${eval1GlobalValue}) {
} else {
$eval2FuncName(${ctx.INPUT_ROW});
if (!${eval2GlobalIsNull} && !${eval2GlobalValue}) {
} else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
${ev.value} = true;
} else {
${ev.isNull} = true;
}
}
"""
ev.copy(code = generatedCode)
}
} else if (!left.nullable && !right.nullable) {
ev.copy(code = s"""
${eval1.code}
boolean ${ev.value} = false;
Expand Down Expand Up @@ -407,7 +446,46 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P
val eval2 = right.genCode(ctx)

// The result should be `true`, if any of them is `true` whenever the other is null or not.
if (!left.nullable && !right.nullable) {

// place generated code of eval1 and eval2 in separate methods if their code combined is large
val combinedLength = eval1.code.length + eval2.code.length
if (combinedLength > 1024 &&
// Split these expressions only if they are created from a row object
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
if (!left.nullable && !right.nullable) {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.value} = true;
if (!${eval1GlobalValue}) {
$eval2FuncName(${ctx.INPUT_ROW});
${ev.value} = ${eval2GlobalValue};
}
"""
ev.copy(code = generatedCode, isNull = "false")
} else {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
boolean ${ev.value} = true;
if (!${eval1GlobalIsNull} && ${eval1GlobalValue}) {
} else {
$eval2FuncName(${ctx.INPUT_ROW});
if (!${eval2GlobalIsNull} && ${eval2GlobalValue}) {
} else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
${ev.value} = false;
} else {
${ev.isNull} = true;
}
}
"""
ev.copy(code = generatedCode)
}
} else if (!left.nullable && !right.nullable) {
ev.isNull = "false"
ev.copy(code = s"""
${eval1.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,43 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
// should not throw exception
projection(row)
}

test("SPARK-21720: split large predications into blocks due to JVM code size limit") {
val length = 600

val input = new GenericInternalRow(length)
val utf8Str = UTF8String.fromString(s"abc")
for (i <- 0 until length) {
input.update(i, utf8Str)
}

var exprOr: Expression = Literal(false)
for (i <- 0 until length) {
exprOr = Or(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprOr)
}

val planOr = GenerateMutableProjection.generate(Seq(exprOr))
val actualOr = planOr(input).toSeq(Seq(exprOr.dataType))
assert(actualOr.length == 1)
val expectedOr = false

if (!checkResult(actualOr.head, expectedOr, exprOr.dataType)) {
fail(s"Incorrect Evaluation: expressions: $exprOr, actual: $actualOr, expected: $expectedOr")
}

var exprAnd: Expression = Literal(true)
for (i <- 0 until length) {
exprAnd = And(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprAnd)
}

val planAnd = GenerateMutableProjection.generate(Seq(exprAnd))
val actualAnd = planAnd(input).toSeq(Seq(exprAnd.dataType))
assert(actualAnd.length == 1)
val expectedAnd = false

if (!checkResult(actualAnd.head, expectedAnd, exprAnd.dataType)) {
fail(
s"Incorrect Evaluation: expressions: $exprAnd, actual: $actualAnd, expected: $expectedAnd")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1733,7 +1733,11 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.count
}

testQuietly("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
// The fix of SPARK-21720 avoid an exception regarding JVM code size limit
// TODO: When we make a threshold of splitting statements (1024) configurable,
// we will re-enable this with max threshold to cause an exception
// See https://github.com/apache/spark/pull/18972/files#r150223463
ignore("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
val N = 400
val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType)))
Expand Down

0 comments on commit 114dc42

Please sign in to comment.