Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14393][SQL] values generated by non-deterministic functions shouldn't change after coalesce or union #15567

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 14 additions & 2 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Expand Up @@ -787,14 +787,26 @@ abstract class RDD[T: ClassTag](
}

/**
* [performance] Spark's internal mapPartitions method which skips closure cleaning. It is a
* performance API to be used carefully only if we are sure that the RDD elements are
* [performance] Spark's internal mapPartitionsWithIndex method that skips closure cleaning.
* It is a performance API to be used carefully only if we are sure that the RDD elements are
* serializable and don't require closure cleaning.
*
* @param preservesPartitioning indicates whether the input function preserves the partitioner,
* which should be `false` unless this is a pair RDD and the input function doesn't modify
* the keys.
*/
private[spark] def mapPartitionsWithIndexInternal[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter),
preservesPartitioning)
}

/**
* [performance] Spark's internal mapPartitions method that skips closure cleaning.
*/
private[spark] def mapPartitionsInternal[U: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 20+ probably valid use of mapPartitionsInternal. The main problem is that changing it to mapPartitionsWithIndexInternal doesn't really force people to initialize the partition.

f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
Expand Down
Expand Up @@ -274,12 +274,12 @@ trait Nondeterministic extends Expression {

private[this] var initialized = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we change this to transient? then it will always get reset to false on a new partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do


final def setInitialValues(): Unit = {
initInternal()
final def initializeStatesForPartition(partitionIndex: Int): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while you are at it, it'd be great to add some comments documenting the function ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about just naming this "initialize"? It is fairly long right now ....

And we just document to say initialize must be called prior to task execution on a partition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want to overload the name initialize, which is a little vague, how about initStates? Again, the issue is that even with comments we cannot force users to initialize it.

initializeStatesForPartitionInternal(partitionIndex)
initialized = true
}

protected def initInternal(): Unit
protected def initializeStatesForPartitionInternal(partitionIndex: Int): Unit

final override def eval(input: InternalRow = null): Any = {
require(initialized, "nondeterministic expression should be initialized before evaluate")
Expand Down
Expand Up @@ -37,7 +37,7 @@ case class InputFileName() extends LeafExpression with Nondeterministic {

override def prettyName: String = "input_file_name"

override protected def initInternal(): Unit = {}
override protected def initializeStatesForPartitionInternal(partitionIndex: Int): Unit = {}

override protected def evalInternal(input: InternalRow): UTF8String = {
InputFileNameHolder.getInputFileName()
Expand Down
Expand Up @@ -50,9 +50,9 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterminis

@transient private[this] var partitionMask: Long = _

override protected def initInternal(): Unit = {
override protected def initializeStatesForPartitionInternal(partitionIndex: Int): Unit = {
count = 0L
partitionMask = TaskContext.getPartitionId().toLong << 33
partitionMask = partitionIndex.toLong << 33
}

override def nullable: Boolean = false
Expand All @@ -68,9 +68,10 @@ case class MonotonicallyIncreasingID() extends LeafExpression with Nondeterminis
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val countTerm = ctx.freshName("count")
val partitionMaskTerm = ctx.freshName("partitionMask")
ctx.addMutableState(ctx.JAVA_LONG, countTerm, s"$countTerm = 0L;")
ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm,
s"$partitionMaskTerm = ((long) org.apache.spark.TaskContext.getPartitionId()) << 33;")
ctx.addMutableState(ctx.JAVA_LONG, countTerm, "")
ctx.addMutableState(ctx.JAVA_LONG, partitionMaskTerm, "")
ctx.addPartitionInitializationStatement(s"$countTerm = 0L;")
ctx.addPartitionInitializationStatement(s"$partitionMaskTerm = ((long) partitionIndex) << 33;")

ev.copy(code = s"""
final ${ctx.javaType(dataType)} ${ev.value} = $partitionMaskTerm + $countTerm;
Expand Down
Expand Up @@ -30,10 +30,12 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
def this(expressions: Seq[Expression], inputSchema: Seq[Attribute]) =
this(expressions.map(BindReferences.bindReference(_, inputSchema)))

expressions.foreach(_.foreach {
case n: Nondeterministic => n.setInitialValues()
case _ =>
})
override def initializeStatesForPartition(partitionIndex: Int): Unit = {
expressions.foreach(_.foreach {
case n: Nondeterministic => n.initializeStatesForPartition(partitionIndex)
case _ =>
})
}

// null check is required for when Kryo invokes the no-arg constructor.
protected val exprArray = if (expressions != null) expressions.toArray else null
Expand Down Expand Up @@ -63,10 +65,12 @@ case class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mu

private[this] val buffer = new Array[Any](expressions.size)

expressions.foreach(_.foreach {
case n: Nondeterministic => n.setInitialValues()
case _ =>
})
override def initializeStatesForPartition(partitionIndex: Int): Unit = {
expressions.foreach(_.foreach {
case n: Nondeterministic => n.initializeStatesForPartition(partitionIndex)
case _ =>
})
}

private[this] val exprArray = expressions.toArray
private[this] var mutableRow: InternalRow = new GenericInternalRow(exprArray.length)
Expand Down
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, IntegerType}

/**
* Expression that returns the current partition id of the Spark task.
* Expression that returns the current partition id.
*/
@ExpressionDescription(
usage = "_FUNC_() - Returns the current partition id of the Spark task",
usage = "_FUNC_() - Returns the current partition id",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm this is behavior changing, and there is some value to the old partition id ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd consider introducing a new expression for the proper id and leave the old one as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. But I don't think the current behavior is the expected behavior from users. This is the same issue as in monotonically_increasing_id.

Copy link
Contributor

@rxin rxin Oct 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea but it is consistent with TaskContext.partitionId (which is also the name of the function)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is SparkPartitionID not TaskContextPartitionID. We should follow the same semantic for non-deterministic expressions.

extended = "> SELECT _FUNC_();\n 0")
case class SparkPartitionID() extends LeafExpression with Nondeterministic {

Expand All @@ -38,16 +37,16 @@ case class SparkPartitionID() extends LeafExpression with Nondeterministic {

override val prettyName = "SPARK_PARTITION_ID"

override protected def initInternal(): Unit = {
partitionId = TaskContext.getPartitionId()
override protected def initializeStatesForPartitionInternal(partitionIndex: Int): Unit = {
partitionId = partitionIndex
}

override protected def evalInternal(input: InternalRow): Int = partitionId

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val idTerm = ctx.freshName("partitionId")
ctx.addMutableState(ctx.JAVA_INT, idTerm,
s"$idTerm = org.apache.spark.TaskContext.getPartitionId();")
ctx.addMutableState(ctx.JAVA_INT, idTerm, "")
ctx.addPartitionInitializationStatement(s"$idTerm = partitionIndex;")
ev.copy(code = s"final ${ctx.javaType(dataType)} ${ev.value} = $idTerm;", isNull = "false")
}
}
Expand Up @@ -184,6 +184,20 @@ class CodegenContext {
splitExpressions(initCodes, "init", Nil)
}

/**
* Code statements to initialize states that depend on the partition index.
* An integer `partitionIndex` will be made available within the scope.
*/
val partitionInitializationStatements: mutable.ArrayBuffer[String] = mutable.ArrayBuffer.empty

def addPartitionInitializationStatement(statement: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason you are creating this rather than just using addMutableState?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little worried about introducing more issues by moving initMutableStates out from the constructor. The current implementation at least maintains the existing behavior if we missed initializeStatesForPartition somewhere.

partitionInitializationStatements += statement
}

def initPartition(): String = {
partitionInitializationStatements.mkString("\n")
}

/**
* Holding all the functions those will be added into generated class.
*/
Expand Down
Expand Up @@ -25,15 +25,23 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, LeafExpression, No
trait CodegenFallback extends Expression {

protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
foreach {
case n: Nondeterministic => n.setInitialValues()
case _ =>
}

// LeafNode does not need `input`
val input = if (this.isInstanceOf[LeafExpression]) "null" else ctx.INPUT_ROW
val idx = ctx.references.length
ctx.references += this
var childIndex = idx
this.foreach {
case n: Nondeterministic =>
// This might add the current expression twice, but it won't hurt.
ctx.references += n
childIndex += 1
ctx.addPartitionInitializationStatement(
s"""
|((Nondeterministic) references[$childIndex])
| .initializeStatesForPartition(partitionIndex);
""".stripMargin)
case _ =>
}
val objectTerm = ctx.freshName("obj")
val placeHolder = ctx.registerComment(this.toString)
if (nullable) {
Expand Down
Expand Up @@ -111,6 +111,10 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], MutableP
${ctx.initMutableStates()}
}

public void initializeStatesForPartition(int partitionIndex) {
${ctx.initPartition()}
}

${ctx.declareAddedFunctions()}

public ${classOf[BaseMutableProjection].getName} target(InternalRow row) {
Expand Down
Expand Up @@ -25,19 +25,26 @@ import org.apache.spark.sql.catalyst.expressions._
*/
abstract class Predicate {
def eval(r: InternalRow): Boolean

/**
* Initialize internal states given the current partition index.
* This is used by non-deterministic expressions to set initial states.
* The default implementation does nothing.
*/
def initializeStatesForPartition(partitionIndex: Int): Unit = {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to make this safer, i'd create an internal variable "isInitialized" similar to the one in nondeterministic expression, and assert in eval if isInitialized is false
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test. Would doing that hurt the performance?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, since it is in the interpreted path which is already very slow. Also in the normal case the condition will always be false, so CPU branch prediction should work its magic.

}

/**
* Generates bytecode that evaluates a boolean [[Expression]] on a given input [[InternalRow]].
*/
object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Boolean] {
object GeneratePredicate extends CodeGenerator[Expression, Predicate] {

protected def canonicalize(in: Expression): Expression = ExpressionCanonicalizer.execute(in)

protected def bind(in: Expression, inputSchema: Seq[Attribute]): Expression =
BindReferences.bindReference(in, inputSchema)

protected def create(predicate: Expression): ((InternalRow) => Boolean) = {
protected def create(predicate: Expression): Predicate = {
val ctx = newCodeGenContext()
val eval = predicate.genCode(ctx)

Expand All @@ -55,6 +62,10 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
${ctx.initMutableStates()}
}

public void initializeStatesForPartition(int partitionIndex) {
${ctx.initPartition()}
}

${ctx.declareAddedFunctions()}

public boolean eval(InternalRow ${ctx.INPUT_ROW}) {
Expand All @@ -67,7 +78,6 @@ object GeneratePredicate extends CodeGenerator[Expression, (InternalRow) => Bool
new CodeAndComment(codeBody, ctx.getPlaceHolderToComments()))
logDebug(s"Generated predicate '$predicate':\n${CodeFormatter.format(code)}")

val p = CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
(r: InternalRow) => p.eval(r)
CodeGenerator.compile(code).generate(ctx.references.toArray).asInstanceOf[Predicate]
}
}
Expand Up @@ -173,6 +173,10 @@ object GenerateSafeProjection extends CodeGenerator[Seq[Expression], Projection]
${ctx.initMutableStates()}
}

public void initializeStatesForPartition(int partitionIndex) {
${ctx.initPartition()}
}

${ctx.declareAddedFunctions()}

public java.lang.Object apply(java.lang.Object _i) {
Expand Down
Expand Up @@ -380,6 +380,10 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro
${ctx.initMutableStates()}
}

public void initializeStatesForPartition(int partitionIndex) {
${ctx.initPartition()}
}

${ctx.declareAddedFunctions()}

// Scala.Function1 need this
Expand Down
Expand Up @@ -64,7 +64,15 @@ package object expressions {
* column of the new row. If the schema of the input row is specified, then the given expression
* will be bound to that schema.
*/
abstract class Projection extends (InternalRow => InternalRow)
abstract class Projection extends (InternalRow => InternalRow) {

/**
* Initialize internal states given the current partition index.
* This is used by non-deterministic expressions to set initial states.
* The default implementation does nothing.
*/
def initializeStatesForPartition(partitionIndex: Int): Unit = {}
}

/**
* Converts a [[InternalRow]] to another Row given a sequence of expression that define each
Expand Down
Expand Up @@ -31,10 +31,6 @@ object InterpretedPredicate {
create(BindReferences.bindReference(expression, inputSchema))

def create(expression: Expression): (InternalRow => Boolean) = {
expression.foreach {
case n: Nondeterministic => n.setInitialValues()
case _ =>
}
(r: InternalRow) => expression.eval(r).asInstanceOf[Boolean]
}
}
Expand Down
Expand Up @@ -42,8 +42,8 @@ abstract class RDG extends LeafExpression with Nondeterministic {
*/
@transient protected var rng: XORShiftRandom = _

override protected def initInternal(): Unit = {
rng = new XORShiftRandom(seed + TaskContext.getPartitionId)
override protected def initializeStatesForPartitionInternal(partitionIndex: Int): Unit = {
rng = new XORShiftRandom(seed + partitionIndex)
}

override def nullable: Boolean = false
Expand All @@ -70,8 +70,9 @@ case class Rand(seed: Long) extends RDG {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val rngTerm = ctx.freshName("rng")
val className = classOf[XORShiftRandom].getName
ctx.addMutableState(className, rngTerm,
s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
ctx.addMutableState(className, rngTerm, "")
ctx.addPartitionInitializationStatement(
s"$rngTerm = new $className(${seed}L + partitionIndex);")
ev.copy(code = s"""
final ${ctx.javaType(dataType)} ${ev.value} = $rngTerm.nextDouble();""", isNull = "false")
}
Expand All @@ -93,8 +94,9 @@ case class Randn(seed: Long) extends RDG {
override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val rngTerm = ctx.freshName("rng")
val className = classOf[XORShiftRandom].getName
ctx.addMutableState(className, rngTerm,
s"$rngTerm = new $className(${seed}L + org.apache.spark.TaskContext.getPartitionId());")
ctx.addMutableState(className, rngTerm, "")
ctx.addPartitionInitializationStatement(
s"$rngTerm = new $className(${seed}L + partitionIndex);")
ev.copy(code = s"""
final ${ctx.javaType(dataType)} ${ev.value} = $rngTerm.nextGaussian();""", isNull = "false")
}
Expand Down
Expand Up @@ -1060,6 +1060,7 @@ object ConvertToLocalRelation extends Rule[LogicalPlan] {
case Project(projectList, LocalRelation(output, data))
if !projectList.exists(hasUnevaluableExpr) =>
val projection = new InterpretedProjection(projectList, output)
projection.initializeStatesForPartition(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection))
}

Expand Down
Expand Up @@ -75,7 +75,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {

protected def evaluate(expression: Expression, inputRow: InternalRow = EmptyRow): Any = {
expression.foreach {
case n: Nondeterministic => n.setInitialValues()
case n: Nondeterministic => n.initializeStatesForPartition(0)
case _ =>
}
expression.eval(inputRow)
Expand Down Expand Up @@ -121,6 +121,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
val plan = generateProject(
GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
expression)
plan.initializeStatesForPartition(0)

val actual = plan(inputRow).get(0, expression.dataType)
if (!checkResult(actual, expected)) {
Expand Down Expand Up @@ -182,12 +183,14 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
var plan = generateProject(
GenerateMutableProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
expression)
plan.initializeStatesForPartition(0)
var actual = plan(inputRow).get(0, expression.dataType)
assert(checkResult(actual, expected))

plan = generateProject(
GenerateUnsafeProjection.generate(Alias(expression, s"Optimized($expression)")() :: Nil),
expression)
plan.initializeStatesForPartition(0)
actual = FromUnsafeProjection(expression.dataType :: Nil)(
plan(inputRow)).get(0, expression.dataType)
assert(checkResult(actual, expected))
Expand Down