Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5817] [SQL] Fix bug of udtf with column names #4602

Closed
wants to merge 7 commits into from

Conversation

chenghao-intel
Copy link
Contributor

It's a bug while do query like:

select d from (select explode(array(1,1)) d from src limit 1) t

And it will throws exception like:

org.apache.spark.sql.AnalysisException: cannot resolve 'd' given input columns _c0; line 1 pos 7
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:48)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:45)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:249)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:103)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:116)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)

To solve the bug, it requires code refactoring for UDTF
The major changes are about:

  • Simplifying the UDTF development, UDTF will manage the output attribute names any more, instead, the logical.Generate will handle that properly.
  • UDTF will be asked for the output schema (data types) during the logical plan analyzing.

@SparkQA
Copy link

SparkQA commented Feb 14, 2015

Test build #27472 has started for PR 4602 at commit 5ddab7e.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 14, 2015

Test build #27472 has finished for PR 4602 at commit 5ddab7e.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27472/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Feb 14, 2015

Test build #27473 has started for PR 4602 at commit 7738ca6.

  • This patch merges cleanly.

@@ -101,6 +101,7 @@ case class Alias(child: Expression, name: String)
extends NamedExpression with trees.UnaryNode[Expression] {

override type EvaluatedType = Any
override lazy val resolved = childrenResolved && !child.isInstanceOf[Generator]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alias(Generator) does not like the normal expression, and it will be transformed into Generate(Generator, alias).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to this effect?

@SparkQA
Copy link

SparkQA commented Feb 14, 2015

Test build #27473 has finished for PR 4602 at commit 7738ca6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27473/
Test PASSed.

@@ -137,6 +137,11 @@ class Analyzer(catalog: Catalog,
failAnalysis(
s"unresolved operator ${operator.simpleString}")

case p @ Project(exprs, _) if exprs.length > 1 && exprs.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps exprs.find(_.isInstanceOf[Generator]).isDefined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. Project(Alias(Generator1, name), Alias(Generator2, name2))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, it's a bug in my code, thanks for finding this. :)

@SparkQA
Copy link

SparkQA commented Feb 15, 2015

Test build #27499 has started for PR 4602 at commit 9656e51.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 15, 2015

Test build #27499 has finished for PR 4602 at commit 9656e51.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27499/
Test PASSed.

@chenghao-intel
Copy link
Contributor Author

@marmbrus any more comments on this?

@yhuai
Copy link
Contributor

yhuai commented Feb 17, 2015

I tried the following

val rdd = sc.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
sqlContext.jsonRDD(rdd).registerTempTable("jt")
sqlContext.sql("CREATE TABLE gen_tmp (key Int)")
sqlContext.sql("INSERT OVERWRITE TABLE gen_tmp SELECT explode(array(1,2,3)) AS val FROM jt LIMIT 1")
org.apache.spark.sql.AnalysisException: invalid cast from array<struct<_c0:int>> to int;
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.failAnalysis(Analyzer.scala:85)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$18$$anonfun$apply$2.applyOrElse(Analyzer.scala:98)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$apply$18$$anonfun$apply$2.applyOrElse(Analyzer.scala:92)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:249)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:263)

@chenghao-intel
Copy link
Contributor Author

Thank you @yhuai , I've updated the description and rebased the code.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27617 has started for PR 4602 at commit f6907d2.

  • This patch merges cleanly.

@chenghao-intel
Copy link
Contributor Author

retest this please.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27620 has started for PR 4602 at commit f6907d2.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27617 has finished for PR 4602 at commit f6907d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27617/
Test PASSed.

@SparkQA
Copy link

SparkQA commented Feb 17, 2015

Test build #27620 has finished for PR 4602 at commit f6907d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class ShowTablesCommand(databaseName: Option[String]) extends RunnableCommand

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27620/
Test PASSed.

@chenghao-intel
Copy link
Contributor Author

/cc @marmbrus @yhuai Any comment on this?

@yhuai
Copy link
Contributor

yhuai commented Feb 18, 2015

@chenghao-intel After another look of the code, I think it may be better to remove aliases from the generator. Then, MultiAlias can be used to assign the names to the output fields of a generator. Because a Generator is just an Expression, seems it is not a good idea to put names in it. Instead, using an NamedExpression (e.g. MultiAlias) to wrap a Generator looks like a better approach.

@chenghao-intel
Copy link
Contributor Author

The generator does not like the normal expression, it can output multiple columns. In current implementation, the logical plan node Generate is for that purpose, not the Project. I agree that we need to be improved, as some duplicated code in the sub classes of generator, probably all we need is a more general logical plan node Project? But it seems more changes need to be done for that. I can do that after this PR merged.

@chenghao-intel
Copy link
Contributor Author

@yhuai please ignore my previous comment. I was thinking some other possibilities.
I agree with you we can move the output column names into the logical plan node Generate, but one thing that I am not sure if we need to provide the ability of managing the default field names(if it's not specified) by the generator expression itself.

@chenghao-intel
Copy link
Contributor Author

@yhuai @marmbrus this is a bug fixing, it will be great if you can give more comments on this, and I agree with @yhuai we need to refactor the UDTF expression implementation, but can I put that in the next PR? This is actually a block issue for our internally benchmark.

@@ -144,6 +144,12 @@ class Analyzer(catalog: Catalog,
failAnalysis(
s"unresolved operator ${operator.simpleString}")

case p @ Project(exprs, _) if exprs.length > 1 && exprs.flatMap(_.collect {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull containsMultipleGenerators out into a function.

case p @ Project(exprs, _) if containsMultipleGenerators(exprs) =>
failAnalysis(
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test for this error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, I added in the unit test. see HiveQuerySuite.scala.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30468 has started for PR 4602 at commit d2e8b43.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30468 has finished for PR 4602 at commit d2e8b43.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30468/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30489 has started for PR 4602 at commit 5ee5d2c.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30490 has started for PR 4602 at commit 04ae500.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30491 has started for PR 4602 at commit 556e982.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30493 has started for PR 4602 at commit c2a5132.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30489 has finished for PR 4602 at commit 5ee5d2c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30489/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30490 has finished for PR 4602 at commit 04ae500.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30490/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30491 has finished for PR 4602 at commit 556e982.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30491/
Test FAILed.

@SparkQA
Copy link

SparkQA commented Apr 17, 2015

Test build #30493 has finished for PR 4602 at commit c2a5132.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Explode(child: Expression)
  • This patch does not change any dependencies.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/30493/
Test PASSed.

@marmbrus
Copy link
Contributor

Thanks, merged to master.

@asfgit asfgit closed this in 7662ec2 Apr 21, 2015
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
It's a bug while do query like:
```sql
select d from (select explode(array(1,1)) d from src limit 1) t
```
And it will throws exception like:
```
org.apache.spark.sql.AnalysisException: cannot resolve 'd' given input columns _c0; line 1 pos 7
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:48)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$apply$3$$anonfun$apply$1.applyOrElse(CheckAnalysis.scala:45)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:250)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:50)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:249)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$transformExpressionUp$1(QueryPlan.scala:103)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2$$anonfun$apply$2.apply(QueryPlan.scala:117)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:116)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
```

To solve the bug, it requires code refactoring for UDTF
The major changes are about:
* Simplifying the UDTF development, UDTF will manage the output attribute names any more, instead, the `logical.Generate` will handle that properly.
* UDTF will be asked for the output schema (data types) during the logical plan analyzing.

Author: Cheng Hao <hao.cheng@intel.com>

Closes apache#4602 from chenghao-intel/explode_bug and squashes the following commits:

c2a5132 [Cheng Hao] add back resolved for Alias
556e982 [Cheng Hao] revert the unncessary change
002c361 [Cheng Hao] change the rule of resolved for Generate
04ae500 [Cheng Hao] add qualifier only for generator output
5ee5d2c [Cheng Hao] prepend the new qualifier
d2e8b43 [Cheng Hao] Update the code as feedback
ca5e7f4 [Cheng Hao] shrink the commits
@chenghao-intel chenghao-intel deleted the explode_bug branch July 2, 2015 08:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants