Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-47320][SQL] : The behaviour of Datasets involving self joins is inconsistent, unintuitive, with contradictions #45446

Closed
wants to merge 30 commits into from

Conversation

ahshahid
Copy link

@ahshahid ahshahid commented Mar 8, 2024

What changes were proposed in this pull request?

The basis of the change is to distinguish and resolve the ambiguity based on the Dataset from which column is extracted by the user, instead of ExprIds.
That will result in a consistent and intuitive behaviour and also logically correct.
Current code is mixing the resolution basis as sometimes using ExprId and sometimes indirectly using DataSet Id.
This PR used DataSet Id present in AttributeReference's metadata to see if ambiguity can be resolved logically / sensibly by checking with the DataSet ID's of the joining DataSets.

The PR attemps to fix the issue in following way

If the projection fields contain AttributeReference which are not found in the incoming AttributeSet, and the AttributeRef metadata contains the DatasetId info, then the AttributeRef is converted into a new UnresolvedAttributeWithTag and the original attributeRef is passed as paramter .

In the ColumnResolutionHelper, to resolve the UnresolvedAttributeRefWithTag, a new resolution logic is used:
The dataSetId from the original attribute ref's metadata is extracted.

The first BinaryNode contained in the LogicalPlan containing this unresolved attribute, is found.
Then its right leg & left lag's unary nodes are checked for the presennce of DatasetID of attribute ref, using TreeNodeTag("__datasetid").
If both the legs contain datasetId and that too at same relative depth, or neither contains, then resolution exception is thrown
Else the leg which contains datasetId is used to resolve.

Why are the changes needed?

While fixing a bug where Ambiguous Column Exception was raised ( which worked fine in earlier versions of spark), came across multiple situations where a particular nested joined DataSet involving self joins, works, but fails when join order is changed or a column extract from dataset involved in join, is treated as unambiguous when used in join condition but same causes ambiguity exception when used in projection ( select)
There is also an existing test I believe which is falsely passing where resolution of attribute is not happening to the expected Dataset.
For eg:
`
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
df2("aa"), df1("b"))

df1Joindf2.join(df1, df1Joindf2("a") === df1("a"))
`

The above works fine, but below throws Exception. The only difference between the two is that the latter has select(df1("a"). But then df1("a") works fine as a condition

`
val df1 = Seq((1, 2)).toDF("a", "b")
val df2 = Seq((1, 2)).toDF("aa", "bb")
val df1Joindf2 = df1.join(df2, df1("a") === df2("aa")).select(df1("a"),
df2("aa"), df1("b"))

df1Joindf2.join(df1, df1Joindf2("a") === df1("a")).select(df1("a"))
`

There is an existing test in DataFrameSelfJoinSuite
`
test("SPARK-28344: fail ambiguous self join - column ref in Project") {
val df1 = spark.range(3)
val df2 = df1.filter($"id" > 0)

withSQLConf(
  SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "false",
  SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
  // `df2("id")` actually points to the column of `df1`.
  checkAnswer(df1.join(df2).select(df2("id")), Seq(0, 0, 1, 1, 2, 2).map(Row(_)))

  // Alias the dataframe and use qualified column names can fix ambiguous self-join.
  val aliasedDf1 = df1.alias("left")
  val aliasedDf2 = df2.as("right")
  checkAnswer(
    aliasedDf1.join(aliasedDf2).select($"right.id"),
    Seq(1, 1, 1, 2, 2, 2).map(Row(_)))
}

withSQLConf(
  SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> "true",
  SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
 
// Assertion1  : existing 
 assertAmbiguousSelfJoin(df1.join(df2).select(df2("id")))

  // Assertion2 :  added by me
  assertAmbiguousSelfJoin(df2.join(df1).select(df2("id")))
}

}
`
Here the Assertion1 passes ( that is ambiguous exception is thrown)
But the Assertion2 fails ( that is no ambiguous exception is thrown)
The only chnage is the join order

Logically both the assertions are invalid ( In the sense both should NOT be throwing Exception as from the user's perspective there is no ambiguity.
And this PR addresses it

Does this PR introduce any user-facing change?

Yes.
It is possible that any Dataset involving self joins which may have previously been throwing Ambiguity related exceptions are now expected to work , assuming the columns being extracted to be used in APIs are from DataSets being joined at the top most level.

How was this patch tested?

Added new tests. Making stricter assertions. Modifying the existing tests in DataFrameSelfJoinTest which are logically having unambiguity based on datasets from which columns are extracted.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Mar 8, 2024
@ahshahid ahshahid changed the title [SPARK-47217][SQL] : The behaviour of Datasets involving self joins is inconsistent, unintuitive, with contradictions [SPARK-47320][SQL] : The behaviour of Datasets involving self joins is inconsistent, unintuitive, with contradictions Mar 8, 2024
@@ -477,6 +482,57 @@ trait ColumnResolutionHelper extends Logging with DataTypeErrorsBase {
assert(q.children.length == 1)
q.children.head.output
},

resolveOnDatasetId = (datasetid: Long, name: String) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check out function tryResolveDataFrameColumns in this file and see if it matches your expectations?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also check #43115 and see if it fixes your problem?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @cloud-fan will check.. The thing is that when Column is extracted from a DataFrame , then it comes as resolved ( and in cases of bugs, it is incorrect ). But still will check 43115

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you check out function tryResolveDataFrameColumns in this file and see if it matches your expectations?

I will check it out but isn't plan Id tag set only when using Client ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also check #43115 and see if it fixes your problem?

@cloud-fan I applied the patch to nearly latest master with the tests for Spark-47320 . The new tests are failing along with existing ones, so I suppose the PR #43115 needs to be enhanced

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth @cloud-fan,
Attached is the patch which describes the attempt to use planID resolution logic. But that does not work because the planID resolution logic goes till the full depth and even attempt to early end the recursion does not work, as the comparison of the return values of the recursion results, does not contain sufficient data. And further attempt to somehow reuse that code, is proving too cumbersome for me.
patch.txt

Though I have removed the previously added new class UnresolvedAttributeWithTag.

Copy link
Contributor

@peter-toth peter-toth Mar 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have 2 notes to the above:

  • @ahshahid, the following worked in Spark 3.5 but failes in 4.0 after [SPARK-43838][SQL] Fix subquery on single table with having clause can't be optimized #41347 for the same reason as described in the old [SPARK-47217][SQL] Fix ambiguity check in self joins #45343:
    test("SPARK-47217: DeduplicateRelations issue 4") {
      Seq(true, false).foreach(fail =>
        withSQLConf(SQLConf.FAIL_AMBIGUOUS_SELF_JOIN_ENABLED.key -> fail.toString) {
          val df = Seq((1, 2)).toDF("a", "b")
          val df2 = df.select(df("a").as("aa"), df("b").as("bb"))
          val df3 = df.select(df("a"), df("b"))
          val df4 = df2.join(df3, df2("bb") === df("b")).select(df2("aa"), df("a")) // `df("a")` doesn't come from the the join's direct children, but from its descendants 
          checkAnswer(df4, Row(1, 1) :: Nil)
        }
      )
    }
    
    In this test df("a")'s expression id gets deduplicated (in the right side of the join) and so the original expression id doesn't work in the final select. But I think this test case proves that we need tryResolveDataFrameColumns() like deep recursion when we try resolving by plan ids.
  • @cloud-fan, I think there is different problem with tryResolveDataFrameColumns().
    I did try to use it for "re-resolving" attribute references that became invalid in a quick test: peter-toth@a873c24, but a few test cases failed due to some logicalplans can belong to multiple datasets. E.g. if we have:
    val df = Seq((1, 2)).toDF("a", "b")
    val df2 = df.toDF()
    
    then the df and df2 shares the same logicalplan instance but we can't store multiple ids in the current LogicalPlan.PLAN_ID_TAG.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@peter-toth I agree with your analysis. In the current PR, the approach I had in mind, was to allow columns from only top Joining dataframes for simplicity purposes . The reason for this thinking was :

  1. It allows predictable behaviour and easier for user to comprehend the outcome.
  2. while resolving does not need to do deep traversals. In case of repeat dataframes , if we reach the leaves, then dataset IDs most likely will clash, so to resolve ambiguity we would have to resort to something like shortest depth etc ( I still use depth but only 1 level deep).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. PlanId being a single value is a sort of limitation. Though I would hold back my thoughts because planId usage code , I have not fully grasped, except that SQLConnect code is very sensitive to it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could fix this issue with a small change like this: #45552

@ahshahid
Copy link
Author

@peter-toth @cloud-fan ,
IMHO the current idea of spark resolving the attribute to dataframe lower than the top level dataframe(s) , which in process adds missing attribute to various projections in between , can be detrimental to the performance without user being aware of the cause. The scenario which I have in mind is that say user had cached the lower dataframes. Now with the plan implicitly adding missing projects may make those cached plans unusable, without user being aware of the situation.

@ahshahid
Copy link
Author

@peter-toth does your PR for #45552 imply that this PR can be closed ?

@ahshahid
Copy link
Author

@peter-toth does your PR for #45552 imply that this PR can be closed ?

I am wondering if a combo of your PR #45552 and this PR would solve the issue satisfactorily.
i.e

  1. The first preference to resolution if ( DATASET_ID_TAG is present in UnResolvedAttribute) would be the code of this PR. This will resolve if there is apparent unambiguity based on user's choice of dataframe to extract columns ( given that the dataframe used is leg of the top level join.
  2. If the resolution is not possible or ambiguous , delegate it to PlanID resolution.

@peter-toth
Copy link
Contributor

Let's keep this PR open till we fully figure out how to deal with these issues: #45552 (comment)

ashahid and others added 5 commits March 29, 2024 15:12
…tes resolved using datasetId for top level join, the behaviour remains unchanged independent of the flag spark.sql.analyzer.failAmbiguousSelfJoin value
… to LogicalPlan irrespective of boolean FAIL_AMBIGUOUS_SELF_JOIN_ENABLED enabled or not.
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Jul 26, 2024
@github-actions github-actions bot closed this Jul 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants