Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17474] [SQL] fix python udf in TakeOrderedAndProjectExec #15030

Closed
wants to merge 3 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Sep 9, 2016

What changes were proposed in this pull request?

When there is any Python UDF in the Project between Sort and Limit, it will be collected into TakeOrderedAndProjectExec, ExtractPythonUDFs failed to pull the Python UDFs out because QueryPlan.expressions does not include the expression inside Option[Seq[Expression]].

Ideally, we should fix the QueryPlan.expressions, but tried with no luck (it always run into infinite loop). In PR, I changed the TakeOrderedAndProjectExec to no use Option[Seq[Expression]] to workaround it. cc @JoshRosen

How was this patch tested?

Added regression test.

@SparkQA
Copy link

SparkQA commented Sep 9, 2016

Test build #65161 has finished for PR 15030 at commit 3ea0daf.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala:62: type mismatch;
[error]  found   : None.type
[error]  required: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]
[error]           noOpFilter(TakeOrderedAndProjectExec(limit, sortOrder, None, input)),
[error]                                                                  ^
[error] /home/jenkins/workspace/SparkPullRequestBuilder/sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala:77: type mismatch;
[error]  found   : Some[Seq[org.apache.spark.sql.catalyst.expressions.Attribute]]
[error]  required: Seq[org.apache.spark.sql.catalyst.expressions.NamedExpression]
[error]             TakeOrderedAndProjectExec(limit, sortOrder, Some(Seq(input.output.last)), input)),
[error]                     

@SparkQA
Copy link

SparkQA commented Sep 9, 2016

Test build #65167 has finished for PR 15030 at commit 263c147.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -148,8 +148,8 @@ case class TakeOrderedAndProjectExec(
localTopK, child.output, SinglePartition, serializer))
shuffled.mapPartitions { iter =>
val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
if (projectList.isDefined) {
val proj = UnsafeProjection.create(projectList.get, child.output)
if (AttributeSet(projectList) != child.outputSet) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be order-insensitive, set-based comparision or should it be using AttributeSeq instead? I'm wondering whether we could hit a bug in case the project happens to permute the child output columns, since in that case I think we'd end up skipping the final column-reordering projection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, we should just compare it with output as Seq directly.

@SparkQA
Copy link

SparkQA commented Sep 12, 2016

Test build #65273 has finished for PR 15030 at commit 1e319d8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

LGTM

@davies
Copy link
Contributor Author

davies commented Sep 12, 2016

Merging into 2.0 and master.

@asfgit asfgit closed this in a91ab70 Sep 12, 2016
asfgit pushed a commit that referenced this pull request Sep 12, 2016
## What changes were proposed in this pull request?

When there is any Python UDF in the Project between Sort and Limit, it will be collected into TakeOrderedAndProjectExec, ExtractPythonUDFs failed to pull the Python UDFs out because QueryPlan.expressions does not include the expression inside Option[Seq[Expression]].

Ideally, we should fix the `QueryPlan.expressions`, but tried with no luck (it always run into infinite loop). In PR, I changed the TakeOrderedAndProjectExec to no use Option[Seq[Expression]] to workaround it. cc JoshRosen

## How was this patch tested?

Added regression test.

Author: Davies Liu <davies@databricks.com>

Closes #15030 from davies/all_expr.

(cherry picked from commit a91ab70)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
wgtmac pushed a commit to wgtmac/spark that referenced this pull request Sep 19, 2016
## What changes were proposed in this pull request?

When there is any Python UDF in the Project between Sort and Limit, it will be collected into TakeOrderedAndProjectExec, ExtractPythonUDFs failed to pull the Python UDFs out because QueryPlan.expressions does not include the expression inside Option[Seq[Expression]].

Ideally, we should fix the `QueryPlan.expressions`, but tried with no luck (it always run into infinite loop). In PR, I changed the TakeOrderedAndProjectExec to no use Option[Seq[Expression]] to workaround it. cc JoshRosen

## How was this patch tested?

Added regression test.

Author: Davies Liu <davies@databricks.com>

Closes apache#15030 from davies/all_expr.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants