Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23177][SQL][PySpark] Extract zero-parameter UDFs from aggregate #20360

Closed
wants to merge 4 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Jan 23, 2018

What changes were proposed in this pull request?

We extract Python UDFs in logical aggregate which depends on aggregate expression or grouping key in ExtractPythonUDFFromAggregate rule. But Python UDFs which don't depend on above expressions should also be extracted to avoid the issue reported in the JIRA.

A small code snippet to reproduce that issue looks like:

import pyspark.sql.functions as f

df = spark.createDataFrame([(1,2), (3,4)])
f_udf = f.udf(lambda: str("const_str"))
df2 = df.distinct().withColumn("a", f_udf())
df2.show()

Error exception is raised as:

: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#50
        at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
        at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:514)
        at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:513)

This exception raises because HashAggregateExec tries to bind the aliased Python UDF expression (e.g., pythonUDF0#50 AS a#44) to grouping key.

How was this patch tested?

Added test.

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86518 has finished for PR 20360 at commit b6cb621.

  • This patch fails Python style tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 23, 2018

cc @HyukjinKwon @cloud-fan

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86520 has finished for PR 20360 at commit 5c3afbb.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Jan 23, 2018

retest this please.

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. I left one question though.

@@ -45,7 +45,8 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {

private def hasPythonUdfOverAggregate(expr: Expression, agg: Aggregate): Boolean = {
expr.find {
e => PythonUDF.isScalarPythonUDF(e) && e.find(belongAggregate(_, agg)).isDefined
e => PythonUDF.isScalarPythonUDF(e) &&
(e.references.isEmpty || e.find(belongAggregate(_, agg)).isDefined)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use just e.children instead of e.references?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just want to consider some literal inputs like df2 = df.distinct().withColumn("a", f_udf(f.lit("2"))).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I wrote a duplicate comment and removed it back. It didn't show up when I write ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see, sounds good. Thanks!

@SparkQA
Copy link

SparkQA commented Jan 23, 2018

Test build #86523 has finished for PR 20360 at commit 5c3afbb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

LGTM

@@ -45,7 +45,8 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {

private def hasPythonUdfOverAggregate(expr: Expression, agg: Aggregate): Boolean = {
expr.find {
e => PythonUDF.isScalarPythonUDF(e) && e.find(belongAggregate(_, agg)).isDefined
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we update the classdoc too? it currently says Extracts all the Python UDFs in logical aggregate, which depends on aggregate expression or grouping key, evaluate them after aggregate

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Updated.

@cloud-fan
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jan 24, 2018

Test build #86551 has finished for PR 20360 at commit 74684a7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@HyukjinKwon
Copy link
Member

@viirya, mind if I ask to open a backport to branch-2.3?

@asfgit asfgit closed this in a3911cf Jan 24, 2018
@viirya
Copy link
Member Author

viirya commented Jan 24, 2018

@HyukjinKwon Ok. I will open a backport later.

@hankim
Copy link

hankim commented Apr 9, 2018

is there any workaround for this? my environment hasn't upgrade to 2.3.0, but I have exact code that jira ticket has. (http://mail-archives.apache.org/mod_mbox/spark-issues/201801.mbox/%3CJIRA.13132665.1516622460000.6681.1516622520346@Atlassian.JIRA%3E)
i.e., assigning uuid after distinct() call with udf.
Thank you!
cc @viirya @HyukjinKwon @cloud-fan

@viirya
Copy link
Member Author

viirya commented Apr 10, 2018

@hankim maybe like:

import pyspark.sql.functions as f
import uuid

df = spark.createDataFrame([(1,2), (3,4)])
f_udf = f.udf(lambda: str(uuid.uuid4()))
df2 = df.distinct().cache()
df3 = df2.withColumn("a", f_udf()).show()

@viirya viirya deleted the SPARK-23177 branch December 27, 2023 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants