Skip to content

Commit

Permalink
[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - C…
Browse files Browse the repository at this point in the history
…lass Splitting

## What changes were proposed in this pull request?

This pull-request exclusively includes the class splitting feature described in apache#16648. When code for a given class would grow beyond 1600k bytes, a private, nested sub-class is generated into which subsequent functions are inlined. Additional sub-classes are generated as the code threshold is met subsequent times. This code includes 3 changes:

1. Includes helper maps, lists, and functions for keeping track of sub-classes during code generation (included in the `CodeGenerator` class). These helper functions allow nested classes and split functions to be initialized/declared/inlined to the appropriate locations in the various projection classes.
2. Changes `addNewFunction` to return a string to support instances where a split function is inlined to a nested class and not the outer class (and so must be invoked using the class-qualified name). Uses of `addNewFunction` throughout the codebase are modified so that the returned name is properly used.
3. Removes instances of the `this` keyword when used on data inside generated classes. All state declared in the outer class is by default global and accessible to the nested classes. However, if a reference to global state in a nested class is prepended with the `this` keyword, it would attempt to reference state belonging to the nested class (which would not exist), rather than the correct variable belonging to the outer class.

## How was this patch tested?

Added a test case to the `GeneratedProjectionSuite` that increases the number of columns tested in various projections to a threshold that would previously have triggered a `JaninoRuntimeException` for the Constant Pool.

Note: This PR does not address the second Constant Pool issue with code generation (also mentioned in apache#16648): excess global mutable state. A second PR may be opened to resolve that issue.

Author: ALeksander Eskilson <alek.eskilson@cerner.com>

Closes apache#18075 from bdrillard/class_splitting_only.
  • Loading branch information
ALeksander Eskilson authored and cloud-fan committed Jun 15, 2017
1 parent 2051428 commit b32b212
Show file tree
Hide file tree
Showing 22 changed files with 259 additions and 81 deletions.
7 changes: 7 additions & 0 deletions sql/catalyst/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<argLine>-Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m</argLine>
</configuration>
</plugin>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -988,7 +988,7 @@ case class ScalaUDF(
val converterTerm = ctx.freshName("converter")
val expressionIdx = ctx.references.size - 1
ctx.addMutableState(converterClassName, converterTerm,
s"this.$converterTerm = ($converterClassName)$typeConvertersClassName" +
s"$converterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToScalaConverter(((${expressionClassName})((($scalaUDFClassName)" +
s"references[$expressionIdx]).getChildren().apply($index))).dataType());")
converterTerm
Expand All @@ -1005,7 +1005,7 @@ case class ScalaUDF(
// Generate codes used to convert the returned value of user-defined functions to Catalyst type
val catalystConverterTerm = ctx.freshName("catalystConverter")
ctx.addMutableState(converterClassName, catalystConverterTerm,
s"this.$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s"$catalystConverterTerm = ($converterClassName)$typeConvertersClassName" +
s".createToCatalystConverter($scalaUDF.dataType());")

val resultTerm = ctx.freshName("result")
Expand All @@ -1019,7 +1019,7 @@ case class ScalaUDF(

val funcTerm = ctx.freshName("udf")
ctx.addMutableState(funcClassName, funcTerm,
s"this.$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")
s"$funcTerm = ($funcClassName)$scalaUDF.userDefinedFunc();")

// codegen for children expressions
val evals = children.map(_.genCode(ctx))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import scala.util.control.NonFatal

import com.google.common.cache.{CacheBuilder, CacheLoader}
import com.google.common.util.concurrent.{ExecutionError, UncheckedExecutionException}
import org.apache.commons.lang3.exception.ExceptionUtils
import org.codehaus.commons.compiler.CompileException
import org.codehaus.janino.{ByteArrayClassLoader, ClassBodyEvaluator, JaninoRuntimeException, SimpleCompiler}
import org.codehaus.janino.util.ClassFile
Expand Down Expand Up @@ -113,7 +112,7 @@ class CodegenContext {
val idx = references.length
references += obj
val clsName = Option(className).getOrElse(obj.getClass.getName)
addMutableState(clsName, term, s"this.$term = ($clsName) references[$idx];")
addMutableState(clsName, term, s"$term = ($clsName) references[$idx];")
term
}

Expand Down Expand Up @@ -202,16 +201,6 @@ class CodegenContext {
partitionInitializationStatements.mkString("\n")
}

/**
* Holding all the functions those will be added into generated class.
*/
val addedFunctions: mutable.Map[String, String] =
mutable.Map.empty[String, String]

def addNewFunction(funcName: String, funcCode: String): Unit = {
addedFunctions += ((funcName, funcCode))
}

/**
* Holds expressions that are equivalent. Used to perform subexpression elimination
* during codegen.
Expand All @@ -233,10 +222,118 @@ class CodegenContext {
// The collection of sub-expression result resetting methods that need to be called on each row.
val subexprFunctions = mutable.ArrayBuffer.empty[String]

def declareAddedFunctions(): String = {
addedFunctions.map { case (funcName, funcCode) => funcCode }.mkString("\n")
val outerClassName = "OuterClass"

/**
* Holds the class and instance names to be generated, where `OuterClass` is a placeholder
* standing for whichever class is generated as the outermost class and which will contain any
* nested sub-classes. All other classes and instance names in this list will represent private,
* nested sub-classes.
*/
private val classes: mutable.ListBuffer[(String, String)] =
mutable.ListBuffer[(String, String)](outerClassName -> null)

// A map holding the current size in bytes of each class to be generated.
private val classSize: mutable.Map[String, Int] =
mutable.Map[String, Int](outerClassName -> 0)

// Nested maps holding function names and their code belonging to each class.
private val classFunctions: mutable.Map[String, mutable.Map[String, String]] =
mutable.Map(outerClassName -> mutable.Map.empty[String, String])

// Returns the size of the most recently added class.
private def currClassSize(): Int = classSize(classes.head._1)

// Returns the class name and instance name for the most recently added class.
private def currClass(): (String, String) = classes.head

// Adds a new class. Requires the class' name, and its instance name.
private def addClass(className: String, classInstance: String): Unit = {
classes.prepend(className -> classInstance)
classSize += className -> 0
classFunctions += className -> mutable.Map.empty[String, String]
}

/**
* Adds a function to the generated class. If the code for the `OuterClass` grows too large, the
* function will be inlined into a new private, nested class, and a class-qualified name for the
* function will be returned. Otherwise, the function will be inined to the `OuterClass` the
* simple `funcName` will be returned.
*
* @param funcName the class-unqualified name of the function
* @param funcCode the body of the function
* @param inlineToOuterClass whether the given code must be inlined to the `OuterClass`. This
* can be necessary when a function is declared outside of the context
* it is eventually referenced and a returned qualified function name
* cannot otherwise be accessed.
* @return the name of the function, qualified by class if it will be inlined to a private,
* nested sub-class
*/
def addNewFunction(
funcName: String,
funcCode: String,
inlineToOuterClass: Boolean = false): String = {
// The number of named constants that can exist in the class is limited by the Constant Pool
// limit, 65,536. We cannot know how many constants will be inserted for a class, so we use a
// threshold of 1600k bytes to determine when a function should be inlined to a private, nested
// sub-class.
val (className, classInstance) = if (inlineToOuterClass) {
outerClassName -> ""
} else if (currClassSize > 1600000) {
val className = freshName("NestedClass")
val classInstance = freshName("nestedClassInstance")

addClass(className, classInstance)

className -> classInstance
} else {
currClass()
}

classSize(className) += funcCode.length
classFunctions(className) += funcName -> funcCode

if (className == outerClassName) {
funcName
} else {

s"$classInstance.$funcName"
}
}

/**
* Instantiates all nested, private sub-classes as objects to the `OuterClass`
*/
private[sql] def initNestedClasses(): String = {
// Nested, private sub-classes have no mutable state (though they do reference the outer class'
// mutable state), so we declare and initialize them inline to the OuterClass.
classes.filter(_._1 != outerClassName).map {
case (className, classInstance) =>
s"private $className $classInstance = new $className();"
}.mkString("\n")
}

/**
* Declares all function code that should be inlined to the `OuterClass`.
*/
private[sql] def declareAddedFunctions(): String = {
classFunctions(outerClassName).values.mkString("\n")
}

/**
* Declares all nested, private sub-classes and the function code that should be inlined to them.
*/
private[sql] def declareNestedClasses(): String = {
classFunctions.filterKeys(_ != outerClassName).map {
case (className, functions) =>
s"""
|private class $className {
| ${functions.values.mkString("\n")}
|}
""".stripMargin
}
}.mkString("\n")

final val JAVA_BOOLEAN = "boolean"
final val JAVA_BYTE = "byte"
final val JAVA_SHORT = "short"
Expand Down Expand Up @@ -556,8 +653,7 @@ class CodegenContext {
return 0;
}
"""
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case schema: StructType =>
val comparisons = GenerateOrdering.genComparisons(this, schema)
val compareFunc = freshName("compareStruct")
Expand All @@ -573,8 +669,7 @@ class CodegenContext {
return 0;
}
"""
addNewFunction(compareFunc, funcCode)
s"this.$compareFunc($c1, $c2)"
s"${addNewFunction(compareFunc, funcCode)}($c1, $c2)"
case other if other.isInstanceOf[AtomicType] => s"$c1.compare($c2)"
case udt: UserDefinedType[_] => genComp(udt.sqlType, c1, c2)
case _ =>
Expand Down Expand Up @@ -629,7 +724,9 @@ class CodegenContext {

/**
* Splits the generated code of expressions into multiple functions, because function has
* 64kb code size limit in JVM
* 64kb code size limit in JVM. If the class to which the function would be inlined would grow
* beyond 1600kb, we declare a private, nested sub-class, and the function is inlined to it
* instead, because classes have a constant pool limit of 65,536 named values.
*
* @param row the variable name of row that is used by expressions
* @param expressions the codes to evaluate expressions.
Expand Down Expand Up @@ -689,7 +786,6 @@ class CodegenContext {
|}
""".stripMargin
addNewFunction(name, code)
name
}

foldFunctions(functions.map(name => s"$name(${arguments.map(_._2).mkString(", ")})"))
Expand Down Expand Up @@ -773,8 +869,6 @@ class CodegenContext {
|}
""".stripMargin

addNewFunction(fnName, fn)

// Add a state and a mapping of the common subexpressions that are associate with this
// state. Adding this expression to subExprEliminationExprMap means it will call `fn`
// when it is code generated. This decision should be a cost based one.
Expand All @@ -792,7 +886,7 @@ class CodegenContext {
addMutableState(javaType(expr.dataType), value,
s"$value = ${defaultValue(expr.dataType)};")

subexprFunctions += s"$fnName($INPUT_ROW);"
subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
e.foreach(subExprEliminationExprs.put(_, state))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,21 +63,21 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
if (e.nullable) {
val isNull = s"isNull_$i"
val value = s"value_$i"
ctx.addMutableState("boolean", isNull, s"this.$isNull = true;")
ctx.addMutableState("boolean", isNull, s"$isNull = true;")
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"this.$value = ${ctx.defaultValue(e.dataType)};")
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
this.$isNull = ${ev.isNull};
this.$value = ${ev.value};
$isNull = ${ev.isNull};
$value = ${ev.value};
"""
} else {
val value = s"value_$i"
ctx.addMutableState(ctx.javaType(e.dataType), value,
s"this.$value = ${ctx.defaultValue(e.dataType)};")
s"$value = ${ctx.defaultValue(e.dataType)};")
s"""
${ev.code}
this.$value = ${ev.value};
$value = ${ev.value};
"""
}
}
Expand All @@ -87,7 +87,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP

val updates = validExpr.zip(index).map {
case (e, i) =>
val ev = ExprCode("", s"this.isNull_$i", s"this.value_$i")
val ev = ExprCode("", s"isNull_$i", s"value_$i")
ctx.updateColumn("mutableRow", e.dataType, i, ev, e.nullable)
}

Expand Down Expand Up @@ -135,6 +135,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
$allUpdates
return mutableRow;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR
$comparisons
return 0;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}"""

val code = CodeFormatter.stripOverlappingComments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ object GeneratePredicate extends CodeGenerator[Expression, Predicate] {
${eval.code}
return !${eval.isNull} && ${eval.value};
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}"""

val code = CodeFormatter.stripOverlappingComments(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val output = ctx.freshName("safeRow")
val values = ctx.freshName("values")
// These expressions could be split into multiple functions
ctx.addMutableState("Object[]", values, s"this.$values = null;")
ctx.addMutableState("Object[]", values, s"$values = null;")

val rowClass = classOf[GenericInternalRow].getName

Expand All @@ -65,10 +65,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
val allFields = ctx.splitExpressions(tmp, fieldWriters)
val code = s"""
final InternalRow $tmp = $input;
this.$values = new Object[${schema.length}];
$values = new Object[${schema.length}];
$allFields
final InternalRow $output = new $rowClass($values);
this.$values = null;
$values = null;
"""

ExprCode(code, "false", output)
Expand Down Expand Up @@ -184,6 +184,9 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
$allExpressions
return mutableRow;
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val rowWriterClass = classOf[UnsafeRowWriter].getName
val rowWriter = ctx.freshName("rowWriter")
ctx.addMutableState(rowWriterClass, rowWriter,
s"this.$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")
s"$rowWriter = new $rowWriterClass($bufferHolder, ${inputs.length});")

val resetWriter = if (isTopLevel) {
// For top level row writer, it always writes to the beginning of the global buffer holder,
Expand Down Expand Up @@ -182,7 +182,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val arrayWriterClass = classOf[UnsafeArrayWriter].getName
val arrayWriter = ctx.freshName("arrayWriter")
ctx.addMutableState(arrayWriterClass, arrayWriter,
s"this.$arrayWriter = new $arrayWriterClass();")
s"$arrayWriter = new $arrayWriterClass();")
val numElements = ctx.freshName("numElements")
val index = ctx.freshName("index")
val element = ctx.freshName("element")
Expand Down Expand Up @@ -321,7 +321,7 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
val holder = ctx.freshName("holder")
val holderClass = classOf[BufferHolder].getName
ctx.addMutableState(holderClass, holder,
s"this.$holder = new $holderClass($result, ${numVarLenFields * 32});")
s"$holder = new $holderClass($result, ${numVarLenFields * 32});")

val resetBufferHolder = if (numVarLenFields == 0) {
""
Expand Down Expand Up @@ -402,6 +402,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
${eval.code.trim}
return ${eval.value};
}

${ctx.initNestedClasses()}
${ctx.declareNestedClasses()}
}
"""

Expand Down
Loading

0 comments on commit b32b212

Please sign in to comment.