Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22498][SQL] Fix 64KB JVM bytecode limit problem with concat #19728

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,28 @@ case class Concat(children: Seq[Expression]) extends Expression with ImplicitCas

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val evals = children.map(_.genCode(ctx))
val inputs = evals.map { eval =>
s"${eval.isNull} ? null : ${eval.value}"
}.mkString(", ")
ev.copy(evals.map(_.code).mkString("\n") + s"""
boolean ${ev.isNull} = false;
UTF8String ${ev.value} = UTF8String.concat($inputs);
if (${ev.value} == null) {
${ev.isNull} = true;
val argNums = evals.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: numArgs.

val args = ctx.freshName("argLen")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argLen? I think it's not length of args?

ctx.addMutableState("UTF8String[]", args, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid defining this as a global variable, what do you think?


val inputs = evals.zipWithIndex.map { case (eval, index) =>
if (eval.isNull != "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If eval.isNull is not a pre-evaluated constant?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If eval.isNull is not a pre-evaluated constant, I expect that the following code at lines 73-74 will be generated. Only when we ensure it is true, we can avoid assigning a value (use default null value).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not mix in optimizations with bug fix

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I will remove this.

s"""
${eval.code}
if (!${eval.isNull}) {
$args[$index] = ${eval.value};
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are better to assign null too if eval.isNull == null, because args is global variable and we need to override all values for previous row.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, while args is global, UTF8String[] is allocated before executing these assignments. Thus, we can ensure all of elements in args are null.
What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is guaranteed before the first row processed. After we process rows, args are updated for each row. E.g., args[0] can be updated and assigned with a string for row 0. In next row, if eval.isNull is evaluated to true, we don't override back to null, so arg[0] is still the string value for row 0.

Copy link
Member Author

@kiszk kiszk Nov 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the next row is processed, $args = new UTF8String[$argNums] is executed again in apply() method. In other words, I think that current implementation does not reuse UTF8String[] between different rows.
Do you mean UTF8String[] is reused between different rows?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. I didn't notice that args is not globally initialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking your time for my code.
It is not easy to imagine the generated code. I may have to put the generated code.

"""
} else {
""
}
}
val codes = ctx.splitExpressions(ctx.INPUT_ROW, inputs)
ev.copy(s"""
$args = new UTF8String[$argNums];
$codes
UTF8String ${ev.value} = UTF8String.concat($args);
boolean ${ev.isNull} = ${ev.value} == null;
""")
}
}
Expand Down Expand Up @@ -126,18 +139,36 @@ case class ConcatWs(children: Seq[Expression])
// All children are strings. In that case we can construct a fixed size array.
val evals = children.map(_.genCode(ctx))

val inputs = evals.map { eval =>
s"${eval.isNull} ? (UTF8String) null : ${eval.value}"
}.mkString(", ")

ev.copy(evals.map(_.code).mkString("\n") + s"""
UTF8String ${ev.value} = UTF8String.concatWs($inputs);
val argNums = evals.length
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be evals.length -1, or even better, I'd suggest to declare two variables: sep and strings (or the name you prefer) to hold head and tail. This would help readability too IMHO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry that I cannot understand your suggestion.
argNums is referred as $argNums to allocate an array. Are you suggesting to allocate an array as new UTF8String[$argNums + 1]?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I am saying the opposite. Let's consider an example, maybe I better can explain what I mean in this way. Let's assume that we are running concat_ws(',', 'a', 'b'). Then, evals would contain 3 elements. So here your argNums would be 3. But here you would be using only $args[0] and $args[1], because the first element (',', the separator) is handled differently.
Thus, I would suggest to have something like:

val sep = evals.head
val strings = evals.tail
val argNums = strings.length // note that this is evals.length -1
...

I think that in this way the code would be much clearer (other than fixing this little bug).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your kindly explanation. I totally agree with you.

val args = ctx.freshName("argLen")
ctx.addMutableState("UTF8String[]", args, "")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this also can be kept method local, IMHO


val inputs = evals.tail.zipWithIndex.map { case (eval, index) =>
if (eval.isNull != "true") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s"""
${eval.code}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in this and the next 4 lines an indentation space is missing if I am not wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

if (!${eval.isNull}) {
$args[$index] = ${eval.value};
}
"""
} else {
""
}
}
val codes = s"${evals.head.code}\n" + ctx.splitExpressions(ctx.INPUT_ROW, inputs)
ev.copy(s"""
$args = new UTF8String[$argNums];
$codes
UTF8String ${ev.value} = UTF8String.concatWs(${evals.head.value}, $args);
boolean ${ev.isNull} = ${ev.value} == null;
""")
} else {
val array = ctx.freshName("array")
ctx.addMutableState("UTF8String[]", array, "")
val varargNum = ctx.freshName("varargNum")
ctx.addMutableState("int", varargNum, "")
val idxInVararg = ctx.freshName("idxInVararg")
ctx.addMutableState("int", idxInVararg, "")

val evals = children.map(_.genCode(ctx))
val (varargCount, varargBuild) = children.tail.zip(evals.tail).map { case (child, eval) =>
Expand All @@ -163,13 +194,17 @@ case class ConcatWs(children: Seq[Expression])
}
}.unzip

ev.copy(evals.map(_.code).mkString("\n") +
val codes = ctx.splitExpressions(ctx.INPUT_ROW, evals.map(_.code))
val varargCounts = ctx.splitExpressions(ctx.INPUT_ROW, varargCount)
val varargBuilds = ctx.splitExpressions(ctx.INPUT_ROW, varargBuild)
ev.copy(
s"""
int $varargNum = ${children.count(_.dataType == StringType) - 1};
int $idxInVararg = 0;
${varargCount.mkString("\n")}
UTF8String[] $array = new UTF8String[$varargNum];
${varargBuild.mkString("\n")}
$codes
$varargNum = ${children.count(_.dataType == StringType) - 1};
$idxInVararg = 0;
$varargCounts
$array = new UTF8String[$varargNum];
$varargBuilds
UTF8String ${ev.value} = UTF8String.concatWs(${evals.head.value}, $array);
boolean ${ev.isNull} = ${ev.value} == null;
""")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:on
}

test("SPARK-22498: Concat should not generate codes beyond 64KB") {
val N = 5000
val strs = (1 to N).map(x => s"x_$x")
checkEvaluation(Concat(strs.map(Literal.create(_, StringType))), strs.mkString, EmptyRow)
}

test("concat_ws") {
def testConcatWs(expected: String, sep: String, inputs: Any*): Unit = {
val inputExprs = inputs.map {
Expand Down Expand Up @@ -74,6 +80,19 @@ class StringExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
// scalastyle:on
}

test("SPARK-22498: ConcatWs should not generate codes beyond 64KB") {
val N = 5000
val sepExpr = Literal.create("#", StringType)
val strings1 = (1 to N).map(x => s"s$x")
val inputsExpr1 = strings1.map(Literal.create(_, StringType))
checkEvaluation(ConcatWs(sepExpr +: inputsExpr1), strings1.mkString("#"), EmptyRow)

val strings2 = (1 to N).map(x => Seq(s"s$x"))
val inputsExpr2 = strings2.map(Literal.create(_, ArrayType(StringType)))
checkEvaluation(
ConcatWs(sepExpr +: inputsExpr2), strings2.map(s => s(0)).mkString("#"), EmptyRow)
}

test("elt") {
def testElt(result: String, n: java.lang.Integer, args: String*): Unit = {
checkEvaluation(
Expand Down