Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29682][SQL] Resolve conflicting attributes in Expand correctly #26441

Closed
wants to merge 8 commits into from

Conversation

imback82
Copy link
Contributor

@imback82 imback82 commented Nov 8, 2019

What changes were proposed in this pull request?

This PR addresses issues where conflicting attributes in Expand are not correctly handled.

Why are the changes needed?

val numsDF = Seq(1, 2, 3, 4, 5, 6).toDF("nums")
val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol"))
cubeDF.join(cubeDF, "nums").show

fails with the following exception:

org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#35]
:  +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
:     +- Project [nums#3, nums#3 AS nums#37]
:        +- Project [value#1 AS nums#3]
:           +- LocalRelation [value#1]
+- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#58]
   +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
                                                                         ^^^^^^^
      +- Project [nums#3, nums#3 AS nums#37]
         +- Project [value#1 AS nums#3]
            +- LocalRelation [value#1]

Conflicting attributes: nums#38

As you can see from the above plan, num#38, the output of Expand on the right side of Join, should have been handled to produce new attribute. Since the conflict is not resolved in Expand, the failure is happening upstream at Aggregate. This PR addresses handling conflicting attributes in Expand.

Does this PR introduce any user-facing change?

Yes, the previous example now shows the following output:

+----+-----+-----+
|nums|agcol|agcol|
+----+-----+-----+
|   1|    0|    0|
|   6|    0|    0|
|   4|    0|    0|
|   2|    0|    0|
|   5|    0|    0|
|   3|    0|    0|
+----+-----+-----+

How was this patch tested?

Added new unit test.

@imback82
Copy link
Contributor Author

imback82 commented Nov 8, 2019

cc: @cloud-fan @viirya

cubeDF.select("nums").distinct
.join(group0, Seq("nums"), "inner")
.join(group1, Seq("nums"), "inner"),
Seq.empty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this test query different from one in the PR description? I think the simpler query in the description is better.

@SparkQA
Copy link

SparkQA commented Nov 9, 2019

Test build #113476 has finished for PR 26441 at commit b439d62.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class DateTimeConstants
  • public final class CalendarInterval implements Serializable, Comparable<CalendarInterval>
  • case class LocalShuffleReaderExec(child: SparkPlan) extends UnaryExecNode
  • class ContinuousRecordEndpoint(buckets: Seq[Seq[UnsafeRow]], lock: Object)

@SparkQA
Copy link

SparkQA commented Nov 10, 2019

Test build #113506 has finished for PR 26441 at commit d815362.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class MergeIntoTable(
  • sealed abstract class MergeAction(
  • case class DeleteAction(condition: Option[Expression]) extends MergeAction(condition)
  • case class UpdateAction(
  • case class InsertAction(
  • case class Assignment(key: Expression, value: Expression) extends Expression with Unevaluable

(oldVersion,
oldVersion.copy(
projectList =
newNamedExpression(projectList, conflictingAttributes)))

case oldVersion @ Aggregate(_, aggregateExpressions, _)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the query in the PR description works well with the fix below, too;

        case oldVersion @ Aggregate(_, aggregateExpressions, _)
            if AttributeSet(aggregateExpressions.map(_.toAttribute))
              .intersect(conflictingAttributes).nonEmpty =>
        (oldVersion, oldVersion.copy(aggregateExpressions = aggregateExpressions.map(_.newInstance())))

Could we fix this issue in an easier way than the current fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we fix this issue in an easier way than the current fix?

I don't think it is robust enough. For example, the following test fails with the suggested fix:

[info] - [SPARK-6231] join - self join auto resolve ambiguity *** FAILED *** (251 milliseconds)
[info]   Failed to analyze query: org.apache.spark.sql.AnalysisException: Resolved attribute(s) key#4619 missing from key#4518,value#4519 in operator !Aggregate [key#4619], [key#4619, sum(cast(key#4619 as bigint)) AS sum(key)#4620L]. Attribute(s) with the same name appear in the operation: key. Please check if the right attribute(s) are used.;;
[info]   Join Inner, (key#4518 = key#4518)
[info]   :- Aggregate [key#4518], [key#4518, count(1) AS count(1)#4610L]
[info]   :  +- Project [_1#4513 AS key#4518, _2#4514 AS value#4519]
[info]   :     +- LocalRelation [_1#4513, _2#4514]
[info]   +- !Aggregate [key#4619], [key#4619, sum(cast(key#4619 as bigint)) AS sum(key)#4620L]
[info]      +- Project [_1#4513 AS key#4518, _2#4514 AS value#4519]
[info]         +- LocalRelation [_1#4513, _2#4514]
[info]   

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, I see... In the query you showed in the PR description, it seems the dedup logic doesn't work in the Expand node (^^^^^ below):

'Join Inner
:- Aggregate [nums#121, spark_grouping_id#119], [nums#121, max(0) AS agcol#118]
:  +- Expand [List(nums#79, nums#120, 0), List(nums#79, null, 1)], [nums#79, nums#121, spark_grouping_id#119]
:     +- Project [nums#79, nums#79 AS nums#120]
:        +- Project [value#76 AS nums#79]
:           +- LocalRelation [value#76]
+- Aggregate [nums#121, spark_grouping_id#119], [nums#121, max(0) AS agcol#124]
              ^^^^^^^^
   +- Expand [List(nums#79, nums#120, 0), List(nums#79, null, 1)], [nums#79, nums#121, spark_grouping_id#119]
                                                                             ^^^^^^^^
      +- Project [nums#79, nums#79 AS nums#120]
         +- Project [value#76 AS nums#79]
            +- LocalRelation [value#76]

So, we might be able to fix this dedup issue by adding an entry for Expand in dedupRight like this?;

        case oldVersion @ Expand(_, output, _)
            if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
          (oldVersion, oldVersion.copy(output = output.map(_.newInstance())))

@cloud-fan
Copy link
Contributor

It's better to explain why the bug happens in the PR description. I don't understand the current fix, just FYI why we only handle alias in Project: The self-join dedup logical tries to find the root which causes conflicts. Sometimes it's alias in Project, sometimes it's leaf node. For attributes in Project, there must be other nodes under Project that cause the conflicts.

@SparkQA
Copy link

SparkQA commented Nov 11, 2019

Test build #113565 has finished for PR 26441 at commit 7a295cd.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@imback82
Copy link
Contributor Author

Thanks @cloud-fan. I updated the PR description, and please let me know if you need additional info. Also, let me know if updating Project is not necessary.

@imback82
Copy link
Contributor Author

retest this please

@maropu
Copy link
Member

maropu commented Nov 11, 2019

This issue can happen other than the cube aggregate as shown above?

@SparkQA
Copy link

SparkQA commented Nov 12, 2019

Test build #113601 has finished for PR 26441 at commit 7a295cd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@imback82 thanks for updating the PR description! I see the problem now. The nums#38 attribute is conflicting and it comes from Expand. Ideally we should let Expand to re-generate its output attributes, but the Expand doesn't clearly distinguish the output attributes from its child and the output attributes of itself.

I think we should change Expand to follow Generate

case class Expand(..., additionalOutput: Seq[Attribute]) {
  override def producedAttributes: AttributeSet = AttributeSet(additionalOutput)
  def output = child.output ++ additionalOutput
}

And also follow how we dedup Generate:

case oldVersion: Generate
            if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
          val newOutput = oldVersion.generatorOutput.map(_.newInstance())
          (oldVersion, oldVersion.copy(generatorOutput = newOutput))

@imback82
Copy link
Contributor Author

Thanks @cloud-fan! Your suggested solution of updating Expand works as expected. However, I do not think the following

def output = child.output ++ additionalOutput

is always true.

For example,

Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
  +- Project [nums#3, nums#3 AS nums#37]

#37 is an output of child, but not an output of Expand.

So instead of adding additionalOutput to Expand, I just did the following:

case oldVersion: Expand if oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
  val producedAttributes = oldVersion.producedAttributes
  val newOutput = oldVersion.output.map{ e =>
    if (producedAttributes.contains(e)) { e.newInstance() } else { e } }
  (oldVersion, oldVersion.copy(output = newOutput))

where Expand.producedAttributes is updated as:

override def producedAttributes: AttributeSet = AttributeSet(output diff child.output)

Let me know if this approach is fine instead of updating Expand.

@cloud-fan
Copy link
Contributor

Sound good!

@viirya
Copy link
Member

viirya commented Nov 13, 2019

+1 for the proposed fix. And please also update the PR title and description accordingly.

@imback82 imback82 changed the title [SPARK-29682][SQL] Resolve conflicting references in aggregate expressions [SPARK-29682][SQL] Resolve conflicting references in Expand correctly Nov 14, 2019
@imback82 imback82 changed the title [SPARK-29682][SQL] Resolve conflicting references in Expand correctly [SPARK-29682][SQL] Resolve conflicting attributes in Expand correctly Nov 14, 2019
@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113739 has finished for PR 26441 at commit 11927c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 14, 2019

Test build #113740 has finished for PR 26441 at commit e743133.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan cloud-fan closed this in e46e487 Nov 14, 2019
cloud-fan pushed a commit that referenced this pull request Nov 14, 2019
### What changes were proposed in this pull request?

This PR addresses issues where conflicting attributes in `Expand` are not correctly handled.

### Why are the changes needed?

```Scala
val numsDF = Seq(1, 2, 3, 4, 5, 6).toDF("nums")
val cubeDF = numsDF.cube("nums").agg(max(lit(0)).as("agcol"))
cubeDF.join(cubeDF, "nums").show
```
fails with the following exception:
```
org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#35]
:  +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
:     +- Project [nums#3, nums#3 AS nums#37]
:        +- Project [value#1 AS nums#3]
:           +- LocalRelation [value#1]
+- Aggregate [nums#38, spark_grouping_id#36], [nums#38, max(0) AS agcol#58]
   +- Expand [List(nums#3, nums#37, 0), List(nums#3, null, 1)], [nums#3, nums#38, spark_grouping_id#36]
                                                                         ^^^^^^^
      +- Project [nums#3, nums#3 AS nums#37]
         +- Project [value#1 AS nums#3]
            +- LocalRelation [value#1]

Conflicting attributes: nums#38
```
As you can see from the above plan, `num#38`, the output of `Expand` on the right side of `Join`, should have been handled to produce new attribute. Since the conflict is not resolved in `Expand`, the failure is happening upstream at `Aggregate`. This PR addresses handling conflicting attributes in `Expand`.

### Does this PR introduce any user-facing change?

Yes, the previous example now shows the following output:
```
+----+-----+-----+
|nums|agcol|agcol|
+----+-----+-----+
|   1|    0|    0|
|   6|    0|    0|
|   4|    0|    0|
|   2|    0|    0|
|   5|    0|    0|
|   3|    0|    0|
+----+-----+-----+
```
### How was this patch tested?

Added new unit test.

Closes #26441 from imback82/spark-29682.

Authored-by: Terry Kim <yuminkim@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e46e487)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.4!

@imback82
Copy link
Contributor Author

thanks @cloud-fan @maropu @viirya @dongjoon-hyun for review and help!

@dongjoon-hyun
Copy link
Member

Thank you so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants