Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kiszk committed Dec 15, 2017
1 parent a9d40e9 commit 31914c0
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class CodegenContext {
var currentVars: Seq[ExprCode] = null

/**
* Holding expressions' mutable states like `MonotonicallyIncreasingID.count` as a
* Holding expressions' inlined mutable states like `MonotonicallyIncreasingID.count` as a
* 2-tuple: java type, variable name.
* As an example, ("int", "count") will produce code:
* {{{
Expand All @@ -150,15 +150,22 @@ class CodegenContext {
val inlinedMutableStates: mutable.ArrayBuffer[(String, String)] =
mutable.ArrayBuffer.empty[(String, String)]

// An map keyed by mutable states' types holds the status of mutableStateArray
/**
* The mapping between mutable state types and corrseponding compacted arrays.
* The keys are java type string. The values are [[MutableStateArrays]] which encapsulates
* the compacted arrays for the mutable states with the same java type.
*/
val arrayCompactedMutableStates: mutable.Map[String, MutableStateArrays] =
mutable.Map.empty[String, MutableStateArrays]

// An array holds the code that will initialize each state
val mutableStateInitCode: mutable.ArrayBuffer[String] =
mutable.ArrayBuffer.empty[String]

// Holding names and current index of mutableStateArrays for a certain type
/**
* This class holds a set of names of mutableStateArrays that is used for compacting mutable
* states for a certain type, and holds the next available slot of the current compacted array.
*/
class MutableStateArrays {
val arrayNames = mutable.ListBuffer.empty[String]
createNewArray()
Expand All @@ -169,6 +176,11 @@ class CodegenContext {

def getCurrentIndex: Int = currentIndex

/**
* Returns the reference of next available slot in current compacted array. The size of each
* compacted array is controlled by the config `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
* Once reaching the threshold, new compacted array is created.
*/
def getNextSlot(): String = {
if (currentIndex < CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT) {
val res = s"${arrayNames.last}[$currentIndex]"
Expand Down Expand Up @@ -199,17 +211,19 @@ class CodegenContext {
* compacted. Please set `true` into forceInline, if you want to access the
* status fast (e.g. frequently accessed) or if you want to use the original
* variable name
* @param useFreshName If false and inline is true, the name is not changed
* @return the name of the mutable state variable, which is either the original name if the
* variable is inlined to the outer class, or an array access if the variable is to be
* stored in an array of variables of the same type.
* There are two use cases. One is to use the original name for global variable instead
* of fresh name. Second is to use the original initialization statement since it is
* complex (e.g. allocate multi-dimensional array or object constructor has varibles).
* Primitive type variables will be inlined into outer class when the total number of
* mutable variables is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD`
* the max size of an array for compaction is given by
* `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
* @param useFreshName If this is false and forceInline is true, the name is not changed
* @return the name of the mutable state variable, which is the original name or fresh name if
* the variable is inlined to the outer class, or an array access if the variable is to
* be stored in an array of variables of the same type.
* A variable will be inlined into the outer class when one of the following conditions
* are satisfied:
* 1. forceInline is true
* 2. its type is primitive type and the total number of the inlined mutable variables
* is less than `CodeGenerator.OUTER_CLASS_VARIABLES_THRESHOLD`
* 3. its type is multi-dimensional array
* A primitive type variable will be inlined into outer class when the total number of
* When a variable is compacted into an array, the max size of the array for compaction
* is given by `CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT`.
*/
def addMutableState(
javaType: String,
Expand Down Expand Up @@ -1099,9 +1113,9 @@ class CodegenContext {
val commonExprs = equivalentExpressions.getAllEquivalentExprs.filter(_.size > 1)
commonExprs.foreach { e =>
val expr = e.head
val fnName = freshName("evalExpr")
val isNull = s"${fnName}IsNull"
val value = s"${fnName}Value"
val fnName = freshName("subExpr")
val isNull = addMutableState(JAVA_BOOLEAN, "subExprIsNull")
val value = addMutableState(javaType(expr.dataType), "subExprValue")

// Generate the code for this expression tree and wrap it in a function.
val eval = expr.genCode(this)
Expand All @@ -1127,8 +1141,6 @@ class CodegenContext {
// 2. Less code.
// Currently, we will do this for all non-leaf only expression trees (i.e. expr trees with
// at least two nodes) as the cost of doing it is expected to be low.
addMutableState(JAVA_BOOLEAN, isNull, forceInline = true, useFreshName = false)
addMutableState(javaType(expr.dataType), value, forceInline = true, useFreshName = false)

subexprFunctions += s"${addNewFunction(fnName, fn)}($INPUT_ROW);"
val state = SubExprEliminationState(isNull, value)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,4 +425,3 @@ class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
assert(ctx2.mutableStateInitCode.size == CodeGenerator.MUTABLESTATEARRAY_SIZE_LIMIT + 10)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ case class SortExec(
// the iterator to return sorted rows.
val thisPlan = ctx.addReferenceObj("plan", this)
sorterVariable = ctx.addMutableState(classOf[UnsafeExternalRowSorter].getName, "sorter",
v => s"$v = $thisPlan.createSorter();")
v => s"$v = $thisPlan.createSorter();", forceInline = true)
val metrics = ctx.addMutableState(classOf[TaskMetrics].getName, "metrics",
v => s"$v = org.apache.spark.TaskContext.get().taskMetrics();")
v => s"$v = org.apache.spark.TaskContext.get().taskMetrics();", forceInline = true)
val sortedIterator = ctx.addMutableState("scala.collection.Iterator<UnsafeRow>", "sortedIter",
forceInline = true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ trait BaseLimitExec extends UnaryExecNode with CodegenSupport {
}

override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = {
val stopEarly = ctx.addMutableState(ctx.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = 0
val stopEarly = ctx.addMutableState(ctx.JAVA_BOOLEAN, "stopEarly") // init as stopEarly = false

ctx.addNewFunction("stopEarly", s"""
@Override
Expand Down

0 comments on commit 31914c0

Please sign in to comment.