Skip to content

Commit

Permalink
fixing mutable state for several classes
Browse files Browse the repository at this point in the history
  • Loading branch information
ALeksander Eskilson committed Mar 9, 2017
1 parent e7bdc53 commit abfd06f
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 35 deletions.
Expand Up @@ -178,8 +178,12 @@ class CodegenContext {
* variable is inlined to the class, or an array access if the variable is to be stored
* in an array of variables of the same type and initialization.
*/
def addMutableState(javaType: String, variableName: String, initCode: String): String = {
if (mutableStateCount > 10000 && variableName.matches(".*\\d+.*") &&
def addMutableState(
javaType: String,
variableName: String,
initCode: String,
inLine: Boolean = false): String = {
if (!inLine && variableName.matches(".*\\d+.*") &&
(initCode.matches("(^.*\\s*=\\s*null;$|^$)") || isPrimitiveType(javaType))) {
val initCodeKey = initCode.replaceAll(variableName, "*VALUE*")
if (mutableStateArrayIdx.contains((javaType, initCodeKey))) {
Expand Down
Expand Up @@ -268,17 +268,17 @@ abstract class HashExpression[E] extends Expression {

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
ev.isNull = "false"
val valueAccessor = ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
val childrenHash = ctx.splitExpressions(ctx.INPUT_ROW, children.map { child =>
val childGen = child.genCode(ctx)
childGen.code + ctx.nullSafeExec(child.nullable, childGen.isNull) {
computeHash(childGen.value, child.dataType, ev.value, ctx)
computeHash(childGen.value, child.dataType, valueAccessor, ctx)
}
})

val valueAccessor = ctx.addMutableState(ctx.javaType(dataType), ev.value, "")
ev.copy(code = s"""
$valueAccessor = $seed;
$childrenHash""")
$childrenHash""", value = valueAccessor)
}

protected def nullSafeElementHash(
Expand Down Expand Up @@ -612,7 +612,7 @@ case class HiveHash(children: Seq[Expression]) extends HashExpression[Int] {

ev.copy(code = s"""
$valueAccessor = $seed;
$childrenHash""")
$childrenHash""", value = valueAccessor)
}

override def eval(input: InternalRow = null): Int = {
Expand Down
Expand Up @@ -339,7 +339,7 @@ case class NewInstance(
${outer.map(_.code).getOrElse("")}
$valueAccessor = ${ev.isNull} ? ${ctx.defaultValue(javaType)} : $constructorCall;
"""
ev.copy(code = code)
ev.copy(code = code, value = valueAccessor)
}

override def toString: String = s"newInstance($cls)"
Expand Down Expand Up @@ -418,27 +418,17 @@ case class WrapOption(child: Expression, optType: DataType)
case class LambdaVariable(
value: String,
isNull: String,
loopValuesMap: mutable.Map[String, String],
dataType: DataType,
nullable: Boolean = true) extends LeafExpression
with Unevaluable with NonSQLExpression {

override def genCode(ctx: CodegenContext): ExprCode = {
val valueAccessor = loopValuesMap.getOrElseUpdate(value,
ctx.addMutableState(ctx.javaType(dataType), value, ""))
val isNullAccessor = loopValuesMap.getOrElseUpdate(isNull,
ctx.addMutableState("boolean", isNull, ""))
ExprCode(code = "", value = valueAccessor, isNull = if (nullable) isNullAccessor else "false")
ExprCode(code = "", value = value, isNull = if (nullable) isNull else "false")
}
}

object MapObjects {
private val curId = new java.util.concurrent.atomic.AtomicInteger()
// Since the loopValue and loopIsNull mutable state may be compacted into an array of their
// corresponding types, we keep a map between the variable name and its accessor, which is
// either the same name, or an array-access, such that state may be properly assigned between
// the lambdaFunction and the body of `MapObjects`
private val loopValuesMap: mutable.Map[String, String] = mutable.Map.empty[String, String]

/**
* Construct an instance of MapObjects case class.
Expand All @@ -453,8 +443,8 @@ object MapObjects {
elementType: DataType): MapObjects = {
val loopValue = "MapObjects_loopValue" + curId.getAndIncrement()
val loopIsNull = "MapObjects_loopIsNull" + curId.getAndIncrement()
val loopVar = LambdaVariable(loopValue, loopIsNull, loopValuesMap, elementType)
MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData)(loopValuesMap)
val loopVar = LambdaVariable(loopValue, loopIsNull, elementType)
MapObjects(loopValue, loopIsNull, elementType, function(loopVar), inputData)
}
}

Expand All @@ -475,16 +465,13 @@ object MapObjects {
* @param lambdaFunction A function that take the `loopVar` as input, and used as lambda function
* to handle collection elements.
* @param inputData An expression that when evaluated returns a collection object.
* @param loopValuesMap a map holding the name or array-accessor for the mutable state of loopValue
* and loopIsNull variables.
*/
case class MapObjects private(
loopValue: String,
loopIsNull: String,
loopVarDataType: DataType,
lambdaFunction: Expression,
inputData: Expression)
(loopValuesMap: mutable.Map[String, String] = mutable.Map.empty[String, String])
extends Expression with NonSQLExpression {

override def nullable: Boolean = inputData.nullable
Expand All @@ -497,14 +484,10 @@ case class MapObjects private(
override def dataType: DataType =
ArrayType(lambdaFunction.dataType, containsNull = lambdaFunction.nullable)

override protected def otherCopyArgs: Seq[AnyRef] = loopValuesMap :: Nil

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val elementJavaType = ctx.javaType(loopVarDataType)
val loopIsNullAccessor = loopValuesMap.getOrElseUpdate(loopIsNull,
ctx.addMutableState("boolean", loopIsNull, ""))
val loopValueAccessor = loopValuesMap.getOrElseUpdate(loopValue,
ctx.addMutableState(elementJavaType, loopValue, ""))
val loopIsNullAccessor = ctx.addMutableState("boolean", loopIsNull, "", inLine = true)
val loopValueAccessor = ctx.addMutableState(elementJavaType, loopValue, "", inLine = true)
val genInputData = inputData.genCode(ctx)
val genFunction = lambdaFunction.genCode(ctx)
val dataLength = ctx.freshName("dataLength")
Expand Down Expand Up @@ -634,12 +617,12 @@ object ExternalMapToCatalyst {
keyName,
keyType,
keyConverter(
LambdaVariable(keyName, "false", mapValuesMap, keyType, false)),
LambdaVariable(keyName, "false", keyType, false)),
valueName,
valueIsNull,
valueType,
valueConverter(
LambdaVariable(valueName, valueIsNull, mapValuesMap, valueType, valueNullable)),
LambdaVariable(valueName, valueIsNull, valueType, valueNullable)),
inputMap
)
}
Expand Down Expand Up @@ -953,7 +936,7 @@ case class InitializeJavaBean(beanInstance: Expression, setters: Map[String, Exp
$initializeCode
}
"""
ev.copy(code = code, isNull = instanceGen.isNull, value = instanceGen.value)
ev.copy(code = code, isNull = instanceGen.isNull, value = javaBeanInstanceAccessor)
}
}

Expand Down
Expand Up @@ -88,9 +88,11 @@ private[sql] trait ColumnarBatchScan extends CodegenSupport {
val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val idx = ctx.freshName("batchIdx")
val idxAccessor = ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
val nameAccessor = ctx.addMutableState(columnVectorClz, name, s"$name = null;")
val colVars = output.indices.map(i => {
val name = ctx.freshName("colInstance" + i)
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
})
val columnAssigns = colVars.zipWithIndex.map { case (nameAccessor, i) =>
s"$nameAccessor = $batchAccessor.column($i);"
}

Expand Down

0 comments on commit abfd06f

Please sign in to comment.