Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Sort Merge Join LeftSemi issues when JoinFilter is set #10304

Merged
merged 20 commits into from
May 20, 2024

Conversation

comphead
Copy link
Contributor

@comphead comphead commented Apr 29, 2024

Which issue does this PR close?

Closes #10379 .

Rationale for this change

Fixing some existing SMJ LeftSemi bugs when join filter is set. Currently the join either crashes or giving wrong results

What changes are included in this PR?

Are these changes tested?

Are there any user-facing changes?

@comphead
Copy link
Contributor Author

I was able to fix initial query but now stuck on

Error: ArrowError(InvalidArgumentError("all columns in a record batch must have the same length"), None)

Likely related to nulls

@comphead comphead changed the title Fix: Sort Merge Join crashes on TPCH Q21 Fix: Sort Merge Join LeftSemi issues May 4, 2024
@comphead comphead marked this pull request as ready for review May 7, 2024 04:18
@comphead
Copy link
Contributor Author

comphead commented May 7, 2024

fuzztests failing...

@github-actions github-actions bot added the core Core datafusion crate label May 7, 2024
@@ -79,20 +79,20 @@ async fn test_full_join_1k() {
}

#[tokio::test]
async fn test_semi_join_10k() {
async fn test_semi_join_1k() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made it 1k to be consistent with other join fuzz tests

@comphead comphead changed the title Fix: Sort Merge Join LeftSemi issues Fix: Sort Merge Join LeftSemi issues when JoinFilter is set May 7, 2024
@@ -991,6 +992,9 @@ impl SMJStream {
Ordering::Equal => {
if matches!(self.join_type, JoinType::LeftSemi) {
join_streamed = !self.streamed_joined;
// if the join filter specified there can be references to buffered columns
// so its needed to join them
join_buffered = self.filter.is_some();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LeftSemi doesn't join buffered side, why we want to do this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. As the filter uses buffered columns, we need to access to it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the comment doesn't look correct as we don't actually join buffered columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, but if there are more rows at buffered side are matched on keys, won't it add additional joined pairs with nulls and buffered rows?

Copy link
Contributor Author

@comphead comphead May 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope I got the concern right and wrapped it into SQL query

query II
select * from (
with
t1 as (
    select 11 a, 12 b union all
    select 11 a, 13 b),
t2 as (
    select 11 a, 12 b union all
    select 11 a, 12 b union all
    select 11 a, 14 b
    )
select t1.* from t1 where exists (select 1 from t2 where t2.a = t1.a and t2.b != t1.b)
) order by 1, 2;
----
11 12
11 13

it passes, I can add it to slt file as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not able to run such test just because of other SMJ issue not related to this PR:
If the join filter is set and for the same streaming index there are matching rows more or equal to a batch size then the query just stuck. Likely the problem is in polling state and it can be easily reproduced on main branch.

  #[tokio::test]
    async fn test_11() -> Result<()> {
        let ctx: SessionContext = SessionContext::new();

        let sql = "set datafusion.optimizer.prefer_hash_join = false;";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "set datafusion.execution.batch_size = 1";
        let _ = ctx.sql(sql).await?.collect().await?;

        let sql = "
        select * from (
        with
        t1 as (
            select 12 a, 12 b
            ),
        t2 as (
            select 12 a, 12 b
            )
            select t1.* from t1 join t2 on t1.a = t2.b where t1.a > t2.b
        ) order by 1, 2;
        ";

        let actual = ctx.sql(sql).await?.collect().await?;


        Ok(())
    }

I'll file a separate issue for this one, but perhaps we can go with this PR because potential problem you talking about cannot ever be hit because of the issue above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for creating the ticket.

I think that current fix to the LeftSemi isn't correct due to the additional joined pairs of nulls and buffered rows. I don't think we should move forward with it because of that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm putting the PR to draft until we can check if PR requires modifications to avoid addition join pairs of nulls and buffered rows. It depends on #10491

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have checked the case, it doesn't fail, however it produces more rows than expected, looking into this

Copy link
Member

@viirya viirya May 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, what I meant is actually it will produce some rows that are not correct results (due to the additional joined pairs of nulls and buffered rows).

@comphead comphead requested a review from viirya May 9, 2024 15:04
@comphead
Copy link
Contributor Author

@viirya @alamb can I get a review on this PR please?

@alamb
Copy link
Contributor

alamb commented May 13, 2024

I will review this today

@viirya
Copy link
Member

viirya commented May 13, 2024

I'll take another look today.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @comphead -- while I am not an expert in this code it looks like an improvement to me (less panic'ing and better test coverage all around) 👍

cc @richox who I think contributed the first version of this code in #2242 and @yjshen who reviewed, in case you would also like to review the change

@@ -263,5 +263,139 @@ DROP TABLE t1;
statement ok
DROP TABLE t2;


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To verify that these tests cover the code changes, I ran them locally without the code changes in this PR and they failed as expected 👍

Running "sort_merge_join.slt"
thread 'tokio-runtime-worker' panicked at datafusion/physical-plan/src/joins/sort_merge_join.rs:1356:22:
index out of bounds: the len is 0 but the index is 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
External error: task 17 panicked
Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test sqllogictests`

Caused by:
  process didn't exit successfully: `/Users/andrewlamb/Software/datafusion/target/debug/deps/sqllogictests-ce3a36cfeab74789 sort_merge` (exit status: 1)

@@ -1161,6 +1162,15 @@ impl SMJStream {
let filter_columns = if chunk.buffered_batch_idx.is_some() {
if matches!(self.join_type, JoinType::Right) {
get_filter_column(&self.filter, &buffered_columns, &streamed_columns)
} else if matches!(self.join_type, JoinType::LeftSemi) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should also check for JoinType::Left (and the clause above also check for JoinType::RightSemi 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have more to go, LeftAnti is first as it prevents TPCH to run and then double check RightSemi as well, good point

datafusion/physical-plan/src/joins/sort_merge_join.rs Outdated Show resolved Hide resolved
let streamed_indices_length = streamed_indices.len();
let mut corrected_mask: Vec<bool> = vec![false; streamed_indices_length];

#[allow(clippy::needless_range_loop)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why ignore clippy here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clippy doesn't like for loops anymore ....

// have we seen a filter match for a streaming index before
let mut seen_as_true: bool = false;
let streamed_indices_length = streamed_indices.len();
let mut corrected_mask: Vec<bool> = vec![false; streamed_indices_length];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW it might be faster / easier to followto create the BooleanArray directly using BooleanBuilder rather than Vec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, btw, I'm thinking why BooleanArray, doesnt support capacity with default values to achieve the same as

vec![false; streamed_indices_length];

async fn left_semi_join_filtered_mask() -> Result<()> {
assert_eq!(
get_filtered_join_mask(
LeftSemi,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should test a type other than LeftSemi as negative test coverage 🤔

@comphead
Copy link
Contributor Author

Thanks @alamb I'll add more docs and also added a task to check RightSemi join to #9846

I'll let it be opened a little longer to give @viirya more time to have a second eye on the PR

@github-actions github-actions bot removed the core Core datafusion crate label May 17, 2024
@@ -54,7 +45,17 @@ use datafusion_execution::TaskContext;
use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::{PhysicalExprRef, PhysicalSortRequirement};

use futures::{Stream, StreamExt};
use crate::expressions::PhysicalSortExpr;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is by formatter

@comphead comphead marked this pull request as ready for review May 17, 2024 04:18
@comphead
Copy link
Contributor Author

@viirya I'm planning to merge this PR soon as it fixes the crash, and addresses your concern (please see the slt test covering this specific case). All other improvements can be in follow up PR.

Comment on lines 1000 to 1004
join_streamed = !self
.streamed_batch
.join_filter_matched_idxs
.contains(&(self.streamed_batch.idx as u64))
&& !self.streamed_joined;
Copy link
Member

@viirya viirya May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If self.streamed_joined is false, join_filter_matched_idxs always doesn't contain self.streamed_batch.idx, so the two conditions are duplicated as they are both true.

If self.streamed_joined is true, this and check is failed, the another condition doesn't matter.

I'm not sure what this is added to check.

I don't see it addresses the issue in https://github.com/apache/datafusion/pull/10304/files#r1601943239.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I got what you want to do here. It is better to add a comment for later readers.

Comment on lines 1009 to 1011
if matches!(self.join_type, JoinType::LeftSemi) && self.filter.is_none() {
join_streamed = !self.streamed_joined;
// if the join filter specified there can be references to buffered columns
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block knows self.filter.is_none(), why you still do join_buffered = self.filter.is_some();?

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen some issues in this patch. It doesn't look like a correct fix.

@comphead
Copy link
Contributor Author

I've seen some issues in this patch. It doesn't look like a correct fix.

The tests currently in sync with what hash join returns, is there a test showing the opposite?

Comment on lines 1011 to 1013
// if the join filter specified there can be references to buffered columns
// so buffered columns are needed to access them
join_buffered = self.filter.is_some();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// if the join filter specified there can be references to buffered columns
// so buffered columns are needed to access them
join_buffered = self.filter.is_some();

@@ -989,8 +996,21 @@ impl SMJStream {
}
}
Ordering::Equal => {
if matches!(self.join_type, JoinType::LeftSemi) {
if matches!(self.join_type, JoinType::LeftSemi) && self.filter.is_some() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can combine this and below which both are for JoinType::LeftSemi under single if block.

@viirya
Copy link
Member

viirya commented May 20, 2024

I've seen some issues in this patch. It doesn't look like a correct fix.

Took another look. Looks okay to me.

@comphead comphead merged commit 94b5511 into apache:main May 20, 2024
23 checks passed
@alamb
Copy link
Contributor

alamb commented May 21, 2024

🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Sort Merge Join. LeftSemi issues
3 participants