-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-9020][SQL] Support mutable state in code gen expressions #7392
Conversation
cc @rxin |
@@ -49,4 +55,17 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression { | |||
count += 1 | |||
(TaskContext.get().partitionId().toLong << 33) + currentCount |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use partitionMask here?
* in generated classes like `SpecificProjection`. | ||
* Each element is a 3-tuple: java type, variable name, variable value. | ||
*/ | ||
val mutableStates: mutable.ArrayBuffer[(String, String, Any)] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a function so we don;t call += directly on the array buffer.
def addMutableState(javaType: String, variableName: String, initialValue: String): Unit
this way it also makes it more clear what the semantics are for each element in the tuple3.
val code = s""" | ||
public SpecificOrdering generate($exprType[] expr) { | ||
return new SpecificOrdering(expr); | ||
public SpecificOrdering generate($exprType[] expr, Object[] states) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we pass in the code to initialize the variables, rather than using an object array?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be hard to define the generate
interface, as this interface is not codegened.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this initialization will happen only once, and the member variables can be primitive type, so boxing is not a problem here, like int i = (Integer) states[3]
, we will use i after that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe i'm missing something. what i'm saying is that the generated code can look something like
class GenerateProjection456 {
private long nextId123;
public GenerateProjection456() {
nextId123 = 50L;
}
...
}
does this not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm not worried about performance -- i just think it's ugly and unnecessary to pass the state around.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This works for literals, but how about objects? That's also the reason why we pass expressions this way...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have any expressions that require objects for now? if not, it's better to start with a simpler solution. if yes, then yes let's do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need XORShiftRandom
in Rand
. One solution is we only regard the partition id as mutable state, but then we need to create a new XORShiftRandom
every time we evaluate Rand
, which is not good I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we just pass the code to create XORShiftRandom in the constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that...
I'll change type of mutableStates
to Array[(String, String, String)]
, contains java type, variable name, code to initialize it, then we can avoid passing that ugly Object[] states
.
Test build #37212 has finished for PR 7392 at commit
|
@@ -205,7 +213,7 @@ class CodeGenContext { | |||
|
|||
|
|||
abstract class GeneratedClass { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while you are at this, can you add scaladoc for GeneratedClass? It is not obvious what it does just by looking at it.
Test build #37213 has finished for PR 7392 at commit
|
* Returns the partition id of currently active TaskContext. It will return 0 | ||
* if there is no active TaskContext for cases like local execution. | ||
*/ | ||
def getPartitionId(): Int = Option(taskContext.get).map(_.partitionId).getOrElse(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to fix codes for other ids (stageId and attemptId) to return 0 if taskContext not defined?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't use an Option wrapper here, just in case the caller expects higher performance.
Test build #37227 has finished for PR 7392 at commit
|
Test build #37226 has finished for PR 7392 at commit
|
cc @davies for review as well |
@cloud-fan Actually, we can access the Expression object in generated Java code (see Expression.genCode), so it's not true that we can not support Expression that have mutable state in it. For Random and MonotonicallyIncreasingID, we will not have much benefit from the codegen version (they both are LeafExpression), I'd like not to complicate them. Correct me, if there are real performance difference. For others, it can access the Expression object to fetch or update the state. |
@davies our support for mutable state in non-codegen version is actually not correct right now. For example, we have bugs w.r.t. non-determinism for Random because we don't have proper expression init (and as a result calling the expression in local mode without serialziation/deserialization breaks random's determinism given a specific seed). In the future we should just have explicit init for expressions, which is what almost every other database systems do. This is the attempt to add that for code-gen version first. |
BTW this is actually a very small change -- especially considering generatemutableprojection will just go away in the future. |
@@ -69,7 +69,9 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR | |||
} | |||
""" | |||
}.mkString("\n") | |||
|
|||
val mutableStates = ctx.mutableStates.map { case (jt, name, init) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should ordering be stateless?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jt -> javaType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@davies just saw your comment - what did you mean by ordering should be stateless?
Test build #37304 has finished for PR 7392 at commit
|
@@ -46,6 +46,9 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu | |||
${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)}; | |||
""" | |||
}.mkString("\n") | |||
val mutableStates = ctx.mutableStates.map { case (jt, name, init) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
jt -> javaType
@@ -46,30 +46,47 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR | |||
protected def create(ordering: Seq[SortOrder]): Ordering[InternalRow] = { | |||
val ctx = newCodeGenContext() | |||
|
|||
val comparisons = ordering.zipWithIndex.map { case (order, i) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In interpreted mode, we use the same expression to eval 2 rows, which means we only keep one copy of mutable states for that expression. However, in GenerateOrdering
, we call order.child.gen(ctx)
twice and thus keep 2 copy of mutable states for that expression. This is inconsistent, and may return different compare result, so I fixed it here.
However, should we allow stateful expressions in order by?
cc @davies @rxin
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have not figured out a case that need a stateful ordering, could we delay it until we really need it?
Test build #37343 has finished for PR 7392 at commit
|
Test build #37351 has finished for PR 7392 at commit
|
retest this please. |
Test build #37356 has finished for PR 7392 at commit
|
Test build #24 timed out for PR 7392 at commit |
OK I'm merging this. @davies is right - we shouldn't need expressions with mutable state in order by. As a matter of fact, expressions with mutable state should mostly be considered nondeterministic, and in that case, we should always materialize their value before we do sorting, rather than computing the value in sorting. Let's fix that in a separate patch, since it involves more than just removing codegen mutable state from ordering. |
@cloud-fan also -- sometimes in spark sql we execute code (e.g. local project) outside of Spark's dagscheduler. In those cases, I don't think we set TaskContext. It's just more defensive to do that. |
Two related tickets: https://issues.apache.org/jira/browse/SPARK-8609 After initializing a DataFrame with random columns and a seed, ordering by that random column should return same sorted order https://issues.apache.org/jira/browse/SPARK-9071 MonotonicallyIncreasingID and SparkPartitionID should be marked as nondeterministic |
We can keep expressions' mutable states in generated class(like
SpecificProjection
) as member variables, so that we can read and modify them inside codegened expressions.