diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index e9db1e315f229..6851525e15d33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -137,7 +137,7 @@ class CodegenContext { var currentVars: Seq[ExprCode] = null /** - * Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a + * Holding expressions' inlined mutable states like `MonotonicallyIncreasingID.count` as a * 2-tuple: java type, variable name. * As an example, ("int", "count") will produce code: * {{{ @@ -150,7 +150,11 @@ class CodegenContext { val inlinedMutableStates: mutable.ArrayBuffer[(String, String)] = mutable.ArrayBuffer.empty[(String, String)] - // An map keyed by mutable states' types holds the status of mutableStateArray + /** + * The mapping between mutable state types and corrseponding compacted arrays. + * The keys are java type string. The values are [[MutableStateArrays]] which encapsulates + * the compacted arrays for the mutable states with the same java type. + */ val arrayCompactedMutableStates: mutable.Map[String, MutableStateArrays] = mutable.Map.empty[String, MutableStateArrays] @@ -158,7 +162,10 @@ class CodegenContext { val mutableStateInitCode: mutable.ArrayBuffer[String] = mutable.ArrayBuffer.empty[String] - // Holding names and current index of mutableStateArrays for a certain type + /** + * This class holds a set of names of mutableStateArrays that is used for compacting mutable + * states for a certain type, and holds the next available slot of the current compacted array. + */ class MutableStateArrays { val arrayNames = mutable.ListBuffer.empty[String] createNewArray() @@ -169,6 +176,11 @@ class CodegenContext { def getCurrentIndex: Int = currentIndex + /** + * Returns the reference of next available slot in current compacted array. The size of each + * compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. + * Once reaching the threshold, new compacted array is created. + */ def getNextSlot(): String = { if (currentIndex < CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT) { val res = s"${arrayNames.last}[$currentIndex]" @@ -199,17 +211,19 @@ class CodegenContext { * compacted. Please set `true` into forceInline, if you want to access the * status fast (e.g. frequently accessed) or if you want to use the original * variable name - * @param useFreshName If false and inline is true, the name is not changed - * @return the name of the mutable state variable, which is either the original name if the - * variable is inlined to the outer class, or an array access if the variable is to be - * stored in an array of variables of the same type. - * There are two use cases. One is to use the original name for global variable instead - * of fresh name. Second is to use the original initialization statement since it is - * complex (e.g. allocate multi-dimensional array or object constructor has varibles). - * Primitive type variables will be inlined into outer class when the total number of - * mutable variables is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD` - * the max size of an array for compaction is given by - * `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. + * @param useFreshName If this is false and forceInline is true, the name is not changed + * @return the name of the mutable state variable, which is the original name or fresh name if + * the variable is inlined to the outer class, or an array access if the variable is to + * be stored in an array of variables of the same type. + * A variable will be inlined into the outer class when one of the following conditions + * are satisfied: + * 1. forceInline is true + * 2. its type is primitive type and the total number of the inlined mutable variables + * is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD` + * 3. its type is multi-dimensional array + * A primitive type variable will be inlined into outer class when the total number of + * When a variable is compacted into an array, the max size of the array for compaction + * is given by `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`. */ def addMutableState( javaType: String, @@ -1099,9 +1113,9 @@ class CodegenContext { val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1) commonExprs.foreach { e => val expr = e.head - val fnName = freshName("evalExpr") - val isNull = s"${fnName}IsNull" - val value = s"${fnName}Value" + val fnName = freshName("subExpr") + val isNull = addMutableState(JAVA_BOOLEAN, "subExprIsNull") + val value = addMutableState(javaType(expr.dataType), "subExprValue") // Generate the code for this expression tree and wrap it in a function. val eval = expr.genCode(this) @@ -1127,8 +1141,6 @@ class CodegenContext { // 2. Less code. // Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with // at least two nodes) as the cost of doing it is expected to be low. - addMutableState(JAVA_BOOLEAN, isNull, forceInline = true, useFreshName = false) - addMutableState(javaType(expr.dataType), value, forceInline = true, useFreshName = false) subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);" val state = SubExprEliminationState(isNull, value) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 27878f54f24e3..b1a44528e64d7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -425,4 +425,3 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { assert(ctx2.mutableStateInitCode.size == CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT + 10) } } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala index a8246de39132d..996598055b1ba 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortExec.scala @@ -139,9 +139,9 @@ case class SortExec( // the iterator to return sorted rows. val thisPlan = ctx.addReferenceObj("plan", this) sorterVariable = ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, "sorter", - v => s"$v = $thisPlan.createSorter();") + v => s"$v = $thisPlan.createSorter();", forceInline = true) val metrics = ctx.addMutableState(classOf[TaskMetrics].getName, "metrics", - v => s"$v = org.apache.spark.TaskContext.get().taskMetrics();") + v => s"$v = org.apache.spark.TaskContext.get().taskMetrics();", forceInline = true) val sortedIterator = ctx.addMutableState("scala.collection.Iterator", "sortedIter", forceInline = true) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index c168637fc9768..cccee63bc0680 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -71,7 +71,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport { } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { - val stopEarly = ctx.addMutableState(ctx.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = 0 + val stopEarly = ctx.addMutableState(ctx.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false ctx.addNewFunction("stopEarly", s""" @Override