New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[MINOR][SQL][DOCS] Add notes of the deterministic assumption on UDF functions #13087
Conversation
Test build #58530 has finished for PR 13087 at commit
|
Test build #58531 has finished for PR 13087 at commit
|
There are several cases which assumes UDF is deterministic. It would be a big change to user. I'll revert the change on ScalaUDF, and update this PR to change optimizer not to duplicate the UDF expression. |
The reported error scenario was the following. scala> val df = sc.parallelize(Seq(("a", "b"), ("a1", "b1"))).toDF("old","old1")
scala> val udfFunc = udf((s: String) => {println(s"running udf($s)"); s })
scala> val newDF = df.withColumn("new", udfFunc(df("old")))
scala> val filteredOnNewColumnDF = newDF.filter("new <> 'a1'")
scala> filteredOnNewColumnDF.show
running udf(a)
running udf(a)
running udf(a1)
+---+----+---+
|old|old1|new|
+---+----+---+
| a| b| a|
+---+----+---+ The result of this PR is like the following. scala> filteredOnNewColumnDF.show
running udf(a1)
running udf(a)
+---+----+---+
|old|old1|new|
+---+----+---+
| a| b| a|
+---+----+---+ |
Test build #58535 has finished for PR 13087 at commit
|
Hi, @liancheng and @cloud-fan . |
Test build #58647 has finished for PR 13087 at commit
|
Hi, @rxin . |
Test build #58874 has finished for PR 13087 at commit
|
Hi, @marmbrus |
cc @cloud-fan |
@@ -1025,7 +1025,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { | |||
// state and all the input rows processed before. In another word, the order of input rows | |||
// matters for non-deterministic expressions, while pushing down predicates changes the order. | |||
case filter @ Filter(condition, project @ Project(fields, grandChild)) | |||
if fields.forall(_.deterministic) => | |||
if fields.forall(_.deterministic) && | |||
fields.forall(_.find(_.isInstanceOf[ScalaUDF]).isEmpty) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if I understand this correctly, do you mean ScalaUDF
can be nondeterministic and we should always treat it as nondeterministic expression? If so, I think a better idea is just override deterministic
in ScalaUDF
and always return false.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @cloud-fan , again!
Yep. Right. Exactly, I really wanted to do that. So, I made my first initial commit for this PR.
But, you can see the result in the above.
-
the first initial commit: 85fa040
-
Jenkins result: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/58531/consoleFull
-
My decision:
There are several cases which assumes UDF is deterministic. It would be a big change to user. I'll revert the change on ScalaUDF, and update this PR to change optimizer not to duplicate the UDF expression.
I still think that is a correct solution. I mean I totally agree with you. But, as you see, it needs to change other testsuites, so I thought I need commiters' decision to do that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you look into it to see where we made this assumption? For now I prefer to override deterministic
in ScalaUDF
, if it's not a lot of effort to fix this problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great! No problem. I will try to fix other testsuite correctly.
According to @cloud-fan 's advice, the goal of this PR is now making |
Test build #58969 has finished for PR 13087 at commit
|
Hmm. @cloud-fan . override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema)
// Register a UDF for DataFrame, and then
// create a new column named map(predictionCol) by running the predict UDF.
val predict = udf { (userFeatures: Seq[Float], itemFeatures: Seq[Float]) =>
if (userFeatures != null && itemFeatures != null) {
blas.sdot(rank, userFeatures.toArray, 1, itemFeatures.toArray, 1)
} else {
Float.NaN
}
}
dataset
.join(userFactors,
checkedCast(dataset($(userCol)).cast(DoubleType)) === userFactors("id"), "left")
.join(itemFactors,
checkedCast(dataset($(itemCol)).cast(DoubleType)) === itemFactors("id"), "left")
.select(dataset("*"),
predict(userFactors("features"), itemFactors("features")).as($(predictionCol)))
} Nondeterministic UDF has more limitation than I expected.
According to the Jenkins test failure log, this is the last hurdle. However, it proves the usage of UDF prevails. Spark users might depends on this risky feature much more. |
I updated |
Test build #58976 has finished for PR 13087 at commit
|
The PySpark failure is fixed as a HOTFIX. |
I think we can have a way to mark a UDF as non-deterministic, but that is too large of a change to make it the default. Also, is this an actual performance problem, or does it just look like one (and common subexpression elimination is fixing it)? |
@marmbrus +1 |
Thank you for review, @marmbrus and @markhamstra ! Actually, it's huge change. Although I'm not aware of the real background, the reported case can be handled by just preventing I think we can keep the common subexpression elimination without any change. Anyway, I will revert the current investigation into my second commit back. |
@marmbrus +1 I suggest to change the documentation of UDFs instead to underline the expectation that they be deterministic for the time being. |
Thank you, @thunterdb . |
@marmbrus @markhamstra @thunterdb . For |
Test build #59006 has finished for PR 13087 at commit
|
Test build #59009 has finished for PR 13087 at commit
|
My main questions remains: does this actually make a difference in runtime? or is execution smart enough already to do this optimization (even if to the user it looks like its getting called more than once). Also, @dongjoon-hyun would you have time to audit places where UDFs are documented and add the expectation that they are deterministic? |
@dongjoon-hyun I don't think we support UDF this way in SparkR? |
Thank you, @felixcheung ! Then, this PR is enough for the current master branch. :) |
Test build #59113 has finished for PR 13087 at commit
|
@@ -1756,6 +1756,7 @@ def __call__(self, *cols): | |||
@since(1.3) | |||
def udf(f, returnType=StringType()): | |||
"""Creates a :class:`Column` expression representing a user defined function (UDF). | |||
Note that the user-defined functions should be deterministic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! I would say must be
deterministic. We might even want to say that "due to optimization duplicate invocations may be eliminated or the function may even be invoked more times than it is present in the query".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you. I see. I'll fix like that.
Hi, @marmbrus . |
LGTM pending tests. @linbojin, we should also handle your use case though maybe that should be its own JIRA. Perhaps you could open one with the information you posed here? |
Test build #59147 has finished for PR 13087 at commit
|
Hi, @marmbrus . |
sure thats fine. |
merging to master and 2.0 |
Oops. I didn't change the title yet. |
I'm not sure what happen. I'll remove this PR information from @linbojin 's JIRA issue anyway. |
…unctions ## What changes were proposed in this pull request? Spark assumes that UDF functions are deterministic. This PR adds explicit notes about that. ## How was this patch tested? It's only about docs. Author: Dongjoon Hyun <dongjoon@apache.org> Closes #13087 from dongjoon-hyun/SPARK-15282. (cherry picked from commit 37c617e) Signed-off-by: Michael Armbrust <michael@databricks.com>
Good thing I got distracted :) I would have just changed the title while merging though. |
Thank you so much! |
Thank you all for reviewing and helping this PR! |
@marmbrus @dongjoon-hyun I will add the detail description to the old SPARK-15282 JIRA issue. |
What changes were proposed in this pull request?
Spark assumes that UDF functions are deterministic. This PR adds explicit notes about that.
How was this patch tested?
It's only about docs.