Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21720][SQL] Fix 64KB JVM bytecode limit problem with AND or OR #18972

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,36 @@ class CodegenContext {
}
}

/**
* Wrap the generated code of expression, which was created from a row object in INPUT_ROW,
* by a function. ev.isNull and ev.value are passed by global variables
*
* @param ev the code to evaluate expressions.
* @param dataType the data type of ev.value.
* @param baseFuncName the split function name base.
*/
def createAndAddFunction(
ev: ExprCode,
dataType: DataType,
baseFuncName: String): (String, String, String) = {
val globalIsNull = freshName("isNull")
addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
val globalValue = freshName("value")
addMutableState(javaType(dataType), globalValue,
s"$globalValue = ${defaultValue(dataType)};")
val funcName = freshName(baseFuncName)
val funcBody =
s"""
|private void $funcName(InternalRow ${INPUT_ROW}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it work with whole stage codegen? the input is not InternalRow but some variable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it works only if ctx.currentVars == null.
We will follow to support the whole stage codegen as follow-up in other PRs.

| ${ev.code.trim}
| $globalIsNull = ${ev.isNull};
| $globalValue = ${ev.value};
|}
""".stripMargin
val fullFuncName = addNewFunction(funcName, funcBody)
(fullFuncName, globalIsNull, globalValue)
}

/**
* Perform a function which generates a sequence of ExprCodes with a given mapping between
* expressions and common expressions, instead of using the mapping in current context.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (condFuncName, condGlobalIsNull, condGlobalValue) =
createAndAddFunction(ctx, condEval, predicate.dataType, "evalIfCondExpr")
ctx.createAndAddFunction(condEval, predicate.dataType, "evalIfCondExpr")
val (trueFuncName, trueGlobalIsNull, trueGlobalValue) =
createAndAddFunction(ctx, trueEval, trueValue.dataType, "evalIfTrueExpr")
ctx.createAndAddFunction(trueEval, trueValue.dataType, "evalIfTrueExpr")
val (falseFuncName, falseGlobalIsNull, falseGlobalValue) =
createAndAddFunction(ctx, falseEval, falseValue.dataType, "evalIfFalseExpr")
ctx.createAndAddFunction(falseEval, falseValue.dataType, "evalIfFalseExpr")
s"""
$condFuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
Expand Down Expand Up @@ -112,29 +112,6 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
ev.copy(code = generatedCode)
}

private def createAndAddFunction(
ctx: CodegenContext,
ev: ExprCode,
dataType: DataType,
baseFuncName: String): (String, String, String) = {
val globalIsNull = ctx.freshName("isNull")
ctx.addMutableState("boolean", globalIsNull, s"$globalIsNull = false;")
val globalValue = ctx.freshName("value")
ctx.addMutableState(ctx.javaType(dataType), globalValue,
s"$globalValue = ${ctx.defaultValue(dataType)};")
val funcName = ctx.freshName(baseFuncName)
val funcBody =
s"""
|private void $funcName(InternalRow ${ctx.INPUT_ROW}) {
| ${ev.code.trim}
| $globalIsNull = ${ev.isNull};
| $globalValue = ${ev.value};
|}
""".stripMargin
val fullFuncName = ctx.addNewFunction(funcName, funcBody)
(fullFuncName, globalIsNull, globalValue)
}

override def toString: String = s"if ($predicate) $trueValue else $falseValue"

override def sql: String = s"(IF(${predicate.sql}, ${trueValue.sql}, ${falseValue.sql}))"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,46 @@ case class And(left: Expression, right: Expression) extends BinaryOperator with
val eval2 = right.genCode(ctx)

// The result should be `false`, if any of them is `false` whenever the other is null or not.
if (!left.nullable && !right.nullable) {

// place generated code of eval1 and eval2 in separate methods if their code combined is large
val combinedLength = eval1.code.length + eval2.code.length
if (combinedLength > 1024 &&
// Split these expressions only if they are created from a row object
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
if (!left.nullable && !right.nullable) {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.value} = false;
if (${eval1GlobalValue}) {
$eval2FuncName(${ctx.INPUT_ROW});
${ev.value} = ${eval2GlobalValue};
}
"""
ev.copy(code = generatedCode, isNull = "false")
} else {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
boolean ${ev.value} = false;
if (!${eval1GlobalIsNull} && !${eval1GlobalValue}) {
} else {
$eval2FuncName(${ctx.INPUT_ROW});
if (!${eval2GlobalIsNull} && !${eval2GlobalValue}) {
} else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
${ev.value} = true;
} else {
${ev.isNull} = true;
}
}
"""
ev.copy(code = generatedCode)
}
} else if (!left.nullable && !right.nullable) {
ev.copy(code = s"""
${eval1.code}
boolean ${ev.value} = false;
Expand Down Expand Up @@ -431,7 +470,46 @@ case class Or(left: Expression, right: Expression) extends BinaryOperator with P
val eval2 = right.genCode(ctx)

// The result should be `true`, if any of them is `true` whenever the other is null or not.
if (!left.nullable && !right.nullable) {

// place generated code of eval1 and eval2 in separate methods if their code combined is large
val combinedLength = eval1.code.length + eval2.code.length
if (combinedLength > 1024 &&
// Split these expressions only if they are created from a row object
(ctx.INPUT_ROW != null && ctx.currentVars == null)) {

val (eval1FuncName, eval1GlobalIsNull, eval1GlobalValue) =
ctx.createAndAddFunction(eval1, BooleanType, "eval1Expr")
val (eval2FuncName, eval2GlobalIsNull, eval2GlobalValue) =
ctx.createAndAddFunction(eval2, BooleanType, "eval2Expr")
if (!left.nullable && !right.nullable) {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.value} = true;
if (!${eval1GlobalValue}) {
$eval2FuncName(${ctx.INPUT_ROW});
${ev.value} = ${eval2GlobalValue};
}
"""
ev.copy(code = generatedCode, isNull = "false")
} else {
val generatedCode = s"""
$eval1FuncName(${ctx.INPUT_ROW});
boolean ${ev.isNull} = false;
boolean ${ev.value} = true;
if (!${eval1GlobalIsNull} && ${eval1GlobalValue}) {
} else {
$eval2FuncName(${ctx.INPUT_ROW});
if (!${eval2GlobalIsNull} && ${eval2GlobalValue}) {
} else if (!${eval1GlobalIsNull} && !${eval2GlobalIsNull}) {
${ev.value} = false;
} else {
${ev.isNull} = true;
}
}
"""
ev.copy(code = generatedCode)
}
} else if (!left.nullable && !right.nullable) {
ev.isNull = "false"
ev.copy(code = s"""
${eval1.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,4 +324,43 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
// should not throw exception
projection(row)
}

test("SPARK-21720: split large predications into blocks due to JVM code size limit") {
val length = 600

val input = new GenericInternalRow(length)
val utf8Str = UTF8String.fromString(s"abc")
for (i <- 0 until length) {
input.update(i, utf8Str)
}

var exprOr: Expression = Literal(false)
for (i <- 0 until length) {
exprOr = Or(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprOr)
}

val planOr = GenerateMutableProjection.generate(Seq(exprOr))
val actualOr = planOr(input).toSeq(Seq(exprOr.dataType))
assert(actualOr.length == 1)
val expectedOr = false

if (!checkResult(actualOr.head, expectedOr, exprOr.dataType)) {
fail(s"Incorrect Evaluation: expressions: $exprOr, actual: $actualOr, expected: $expectedOr")
}

var exprAnd: Expression = Literal(true)
for (i <- 0 until length) {
exprAnd = And(EqualTo(BoundReference(i, StringType, true), Literal(s"c$i")), exprAnd)
}

val planAnd = GenerateMutableProjection.generate(Seq(exprAnd))
val actualAnd = planAnd(input).toSeq(Seq(exprAnd.dataType))
assert(actualAnd.length == 1)
val expectedAnd = false

if (!checkResult(actualAnd.head, expectedAnd, exprAnd.dataType)) {
fail(
s"Incorrect Evaluation: expressions: $exprAnd, actual: $actualAnd, expected: $expectedAnd")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2067,7 +2067,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
.count
}

testQuietly("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
test("SPARK-19372: Filter can be executed w/o generated code due to JVM code size limit") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this test case becomes invalid as we won't trigger the codegen fallback branch now. Can we just ignore this test and add a TODO to say something about the config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will do it on Sunday.

val N = 400
val rows = Seq(Row.fromSeq(Seq.fill(N)("string")))
val schema = StructType(Seq.tabulate(N)(i => StructField(s"_c$i", StringType)))
Expand All @@ -2081,10 +2081,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
}

withSQLConf(SQLConf.CODEGEN_FALLBACK.key -> "false") {
val e = intercept[SparkException] {
df.filter(filter).count()
}.getMessage
assert(e.contains("grows beyond 64 KB"))
// SPARK-21720 avoids an exception due to JVM code size limit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create a config for the threshold instead of hardcoding 1024, then we can keep the test case here, by setting the threshold to Long.max.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, I agree with you that we should create a config.
Although I create a PR to add a config for a constant in CodeGenerator, it revealed that we need additional (large) work to fix active session management.

Can we introduce a config after fixing active session management?

df.filter(filter).count()
}
}

Expand Down