Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12660] [SPARK-14967] [SQL] Implement Except Distinct by Left Anti Join #12736

Closed
wants to merge 10 commits into from

Conversation

gatorsmile
Copy link
Member

@gatorsmile gatorsmile commented Apr 27, 2016

What changes were proposed in this pull request?

Replaces a logical Except operator with a Left-anti Join operator. This way, we can take advantage of all the benefits of join implementations (e.g. managed memory, code generation, broadcast joins).

  SELECT a1, a2 FROM Tab1 EXCEPT SELECT b1, b2 FROM Tab2
  ==>  SELECT DISTINCT a1, a2 FROM Tab1 LEFT ANTI JOIN Tab2 ON a1<=>b1 AND a2<=>b2

Note:

  1. This rule is only applicable to EXCEPT DISTINCT. Do not use it for EXCEPT ALL.
  2. This rule has to be done after de-duplicating the attributes; otherwise, the enerated
    join conditions will be incorrect.

This PR also corrects the existing behavior in Spark. Before this PR, the behavior is like

  test("except") {
    val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
    val df_right = Seq(1, 3).toDF("id")

    checkAnswer(
      df_left.except(df_right),
      Row(2) :: Row(2) :: Row(4) :: Nil
    )
  }

After this PR, the result is corrected. We strictly follow the SQL compliance of Except Distinct.

How was this patch tested?

Modified and added a few test cases to verify the optimization rule and the results of operators.

@gatorsmile
Copy link
Member Author

cc @hvanhovell Thank you for your quick fix! This PR contains the fix of anti-join: #12730 for avoiding test case failure.

When #12730 is merged, I will remove the related code.

@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57134 has finished for PR 12736 at commit f825dca.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

Merged fix.

@gatorsmile
Copy link
Member Author

Thank you very much! Will clean the code soon

@@ -291,7 +291,7 @@ public void testSetOperation() {
unioned.collectAsList());

Dataset<String> subtracted = ds.except(ds2);
Assert.assertEquals(Arrays.asList("abc", "abc"), subtracted.collectAsList());
Assert.assertEquals(Arrays.asList("abc"), subtracted.collectAsList());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty strange. I will check the exact behavior of our current Except operator. It sounds like it does not remove the duplicate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except apparently does not remove duplicates:

range(0, 10).registerTempTable("a")
range(5, 15).registerTempTable("b")
sql("(select * from a union all select * from a) except select * from b")

results in:

+---+
| id|
+---+
|  2|
|  2|
|  0|
|  0|
|  3|
|  3|
|  4|
|  4|
|  1|
|  1|
+---+

So that is weird.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to decide what we should do next? This PR is doing Except Distinct.

If we want to keep the existing behavior, which is Except All, we need to change the external API in Dataset.

However, the current SQL interface for Except is wrong. We need to correct it at least.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current implementation (before this PR) is somewhere between EXCEPT and EXCEPT ALL it will will remove all rows if it finds a match (essentially eliminating duplicates), but it does not remove duplicates where there is no match. Lets follow the principle of least surprise and create a correct EXCEPT (one that removes duplicates).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh, I see... Let me add a test case to cover it for ensuring it will not be broken again

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the example to show why the current master is wrong.

  test("except") {
    val df_left = Seq(1, 2, 2, 3, 3, 4).toDF("id")
    val df_right = Seq(1, 3).toDF("id")

    checkAnswer(
      df_left.except(df_right),
      Row(2) :: Row(2) :: Row(4) :: Nil
    )
  }

For EXCEPT ALL, we should output

Row(2) :: Row(2) :: Row(3) :: Row(4) :: Nil

For EXCEPT DISTINCT, we should output

Row(2) :: Row(4) :: Nil

Copy link
Contributor

@cloud-fan cloud-fan Apr 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did this PR also fix the semantic of Except, or it only added the optimization?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. After this PR, the behavior of EXCEPT is changed to the standard behavior of EXCEPT DISTINCT.

@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57143 has finished for PR 12736 at commit 70da501.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile gatorsmile changed the title [SPARK-12660] [SQL] Implement Except by Left Anti Join [SPARK-12660] [SQL] Implement Except Distinct by Left Anti Join Apr 27, 2016
@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57164 has finished for PR 12736 at commit 3c61a5a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile gatorsmile changed the title [SPARK-12660] [SQL] Implement Except Distinct by Left Anti Join [SPARK-12660] [SPARK-14967] [SQL] Implement Except Distinct by Left Anti Join Apr 27, 2016
@SparkQA
Copy link

SparkQA commented Apr 27, 2016

Test build #57183 has finished for PR 12736 at commit 7ccd95b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member Author

cc all the people who previously reviewed #10630 : @rxin @marmbrus @cloud-fan @yhuai @nongli

This is ready for review. Thanks!


// Check if no Project is added
assert(r3.left.isInstanceOf[LocalRelation])
assert(r3.right.isInstanceOf[LocalRelation])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove these? We didn't change the analysis of Except right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We added a new condition in resolved: https://github.com/apache/spark/pull/12736/files#diff-72917e7b68f0311b2fb42990e0dc616dR180 In this test case, we are trying to Except the same table firstTable. Thus, the value of resolved becomes false now.

When we resolving WidenSetOperationTypes in

left.output.length == right.output.length && !s.resolved =>
, the whole if condition becomes true. (Before this PR, this condition is always false if you try to except the same table. Thus, we did nothing in this case. Thus, no Project is added)

However, when if condition becomes true, we execute the corresponding logics. We are able to find the common type. Then, we will add Project. This is doing the duplicate checking like what we did in

val r1 = wt(Except(firstTable, secondTable)).asInstanceOf[Except]
val r2 = wt(Intersect(firstTable, secondTable)).asInstanceOf[Intersect]
checkOutput(r1.left, expectedTypes)
checkOutput(r1.right, expectedTypes)
checkOutput(r2.left, expectedTypes)
checkOutput(r2.right, expectedTypes)
// Check if a Project is added
assert(r1.left.isInstanceOf[Project])
assert(r1.right.isInstanceOf[Project])
assert(r2.left.isInstanceOf[Project])
assert(r2.right.isInstanceOf[Project])
. That is why I removed it.

Sorry, it is a little bit complicated to explain the whole logics. Let me know if anything is not clear

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, thanks for the explanation!

@cloud-fan
Copy link
Contributor

LGTM. A unrelated question, how do we express the EXCEPT ALL semantic?

@gatorsmile
Copy link
Member Author

gatorsmile commented Apr 28, 2016

@cloud-fan Thank you for your review!

Like INTERSECT ALL, EXCEPT ALL can be done by aggregation of the union of the two tables. We can augment one table with a new column of constant 1, and the other with constant -1, then union the two tables, and find the row satisfying the result of the COUNT and SUM of the new column.

Originally, I plan to do it after this release. All of you have more time to review them. If we need to implement in this release, I can start writing both INTERSECT ALL and EXCEPT ALL now.

@SparkQA
Copy link

SparkQA commented Apr 28, 2016

Test build #57227 has finished for PR 12736 at commit 4920360.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 222dcf7 Apr 29, 2016
@cloud-fan
Copy link
Contributor

thanks! merging to master!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants