Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete Possible Join Type Handling for EmptyRelation Propagation Rule #10967

Open
7 of 9 tasks
tshauck opened this issue Jun 17, 2024 · 13 comments · Fixed by #11066 · May be fixed by #11069
Open
7 of 9 tasks

Complete Possible Join Type Handling for EmptyRelation Propagation Rule #10967

tshauck opened this issue Jun 17, 2024 · 13 comments · Fixed by #11066 · May be fixed by #11069
Labels
enhancement New feature or request

Comments

@tshauck
Copy link
Contributor

tshauck commented Jun 17, 2024

Is your feature request related to a problem or challenge?

Currently on main only the Inner join type is considered when doing EmptyRelation propagation, but it's possible to infer if an EmptyRelation is propagatable for some other join types for some other empty conditions.

Describe the solution you'd like

Fill out the join types per the TODO here, add or remove any that do or do not make sense:

// TODO: For Join, more join type need to be careful:
// For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
// For LeftSemi Join, if the right side is empty, the Join result is empty.
// For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??).
// For RightOuter/RightSemi/RightAnti Join, only the right side is empty, the Join result is empty.
// For RightSemi Join, if the left side is empty, the Join result is empty.
// For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??).
// For Full Join, only both sides are empty, the Join result is empty.
// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
// columns + right side columns replaced with null values.
// For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
// columns + left side columns replaced with null values.

  • For LeftOuter/LeftSemi/LeftAnti Join, only the left side is empty, the Join result is empty.
  • For LeftSemi Join, if the right side is empty, the Join result is empty.
  • For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??).
  • For RightOuter/RightSemi/RightAnti Join, only the right side is empty, the Join result is empty.
  • For RightSemi Join, if the left side is empty, the Join result is empty.
  • For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??).
  • For Full Join, only both sides are empty, the Join result is empty.
  • For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side columns + right side columns replaced with null values.
  • For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side columns + left side columns replaced with null values.

Describe alternatives you've considered

No response

Additional context

No response

@LorrensP-2158466
Copy link
Contributor

Can i help with this one?

@tshauck
Copy link
Contributor Author

tshauck commented Jun 18, 2024

Sounds good to me -- once #10963 is merged, I'll mark off the ones it updates, then you can take the rest? I took the easy ones on the first go 😅

@LorrensP-2158466
Copy link
Contributor

Sounds good to me!
I've seen those others that you haven't done and they are indeed hard, I will probably have some questions when I try them.

@LorrensP-2158466
Copy link
Contributor

LorrensP-2158466 commented Jun 19, 2024

I just saw @tshauck's PR was approved by 2 reviewers, so i think i can assume it will be merged in the future. I was looking at the remaining cases and wanted to brainstorm in case anyone wanted to give feedback on my thoughts before I try to implement them.
Currently we have following cases left:

  1. For LeftAnti Join, if the right side is empty, the Join result is left side(should exclude null ??).
  2. For RightAnti Join, if the left side is empty, the Join result is right side(should exclude null ??).
  3. For Full Join, only both sides are empty, the Join result is empty.
  4. For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
    columns + right side columns replaced with null values.
  5. For RightOut/Full Join, if the left side is empty, the Join can be eliminated with a Projection with right side
    columns + left side columns replaced with null values.

Case 3

I think this one is trivial. Like tshauck did, just return an empty relation.

Case 1 and 2

Anti Joins result in rows which don't have any matching join column values in the other table. So in the case when there are NULL's present in the Left (Right) table during a Left (Right) Anti Join, these will be included in the result because any comparison with NULL always returns False. DataFusion offers a way to say that NULL = NULL will result in true via the null_equals_null field on the Join node, so there are cases were these NULLs are excluded. But when the other table is empty, these NULL values can't match on anything, regardless of the null_equals_null field. So I think in this case that NULLs should not be excluded because of the default behavior.

Case 4 and 5

I understand why this should be done this way and agree with it. But I don't know how to do this in the Logical Optimizer.
For example:
Table A

ID Name
1 John
2 Lisa
3 Bob

And Table B, which is empty

ID Age

When we do A Left Outer Join B Using ID we expect the result to look like this (of course, in the same way for Right Anti):

ID Name Id Age
1 John NULL NULL
2 Lisa NULL NULL
3 Bob NULL NULL

How can I implement this nicely?
I thought of using Projection::new_from_schema with the schema of the original JOIN, and let DataFusion insert those NULL values when executing that Projection. But this can't be the correct way?

Thanks in advance to anyone who responds!

@tshauck
Copy link
Contributor Author

tshauck commented Jun 19, 2024

For cases 4 and 5, this is kinda what you're getting at already, but there's also Projection::try_new_with_schema... maybe you could get the projection from the left side then augment it with something like lit(ScalarValue::Null) for the righthand columns. Not exactly nice either tho.

@LorrensP-2158466
Copy link
Contributor

LorrensP-2158466 commented Jun 20, 2024

Hmm yeah, but it does sound better. Guess I'll have to try and test them out, maybe I'll stumble onto something else. Thanks for helping!

@tshauck
Copy link
Contributor Author

tshauck commented Jun 20, 2024

You'll probably see otherwise, but #10963 merged. Please LMK if I can help on any of these.

@LorrensP-2158466
Copy link
Contributor

Yeah, I need some help with Cases 4 and 5. Creating a projection of the left or right child and extending its schema from the other table doesn't work. I first tried my suggestion, but that failed very quickly; then I tried yours, which got me closer.
I'm currently doing this:

// For LeftOut/Full Join, if the right side is empty, the Join can be eliminated with a Projection with left side
// columns + right side columns replaced with null values.
JoinType::Full | JoinType::Left if right_empty => {
    dbg!("here");
    Ok(Transformed::yes(LogicalPlan::Projection(
        Projection::try_new_with_schema(
            join.schema
                .columns()
                .into_iter()
                .map(|col| {
                    // transform columns from other schema into NULLs
                    if join.right.schema().is_column_from_schema(&col)
                    {
                        Expr::Literal(ScalarValue::Null)
                    } else {
                        Expr::Column(col)
                    }
                })
                .collect(),
            join.left.clone(),
            join.schema.clone(),
        )?,
    )))
}

But when I test this out I get a SchemaError because they are different
(I did omit some fields, but they don't matter here)

Original:                                       New:
DFSchema {                                 |    DFSchema { 
    inner: Schema {                        |        inner: Schema { 
        fields: [                          |            fields: [
            Field {                        |                Field { 
                name: "a",                 |                    name: "a", 
                data_type: Int64,          |                    data_type: Int64, 
                nullable: true,            |                    nullable: true, 
            },                             |                }, 
            Field {                        |                Field { 
                name: "b",                 |                    name: "b", 
                data_type: Int64,          |                    data_type: Int64, 
                nullable: true,            |                    nullable: true, 
            },                             |                }, 
            Field {                        |                Field { 
                name: "c",                 |                    name: "c", 
                data_type: Int64,          |                    data_type: Int64, 
                nullable: true,            |                    nullable: true, 
            },                             |                }, 
            Field {                        |                Field { 
                name: "b",                 |                    name: "right.b", 
                data_type: Int64,          |                    data_type: Null, 
                nullable: true,            |                    nullable: true, 
            },                             |                }, 
            Field {                        |                Field { 
                name: "c",                 |                    name: "right.c", 
                data_type: Int64,          |                    data_type: Null, 
                nullable: true,            |                    nullable: true, 
            }                              |                }
        ],                                 |            ], 
        metadata: {}                       |            metadata: {} 
    },                                     |        }, 
    field_qualifiers: [                    |        field_qualifiers: [
        Some(Bare { table: "left" }),      |            Some(Bare { table: "left" }), 
        Some(Bare { table: "left" }),      |            Some(Bare { table: "left" }), 
        Some(Bare { table: "left" }),      |            Some(Bare { table: "left" }), 
        Some(Bare { table: "right" }),     |            None, 
        Some(Bare { table: "right" })      |            None
    ],                                     |        ], 
}                                          |    }

I don't really know how to fix this. I can create a Draft PR if you want, so you can look at my code instead of this comment.

Other ways

I think that we have to ignore these cases, and if they do cause problems on larger tables, then we can optimize the join impl to handle empty join tables. I'm not sure though, I don't have much experience with this so I'm just looking at other possible solutions

Let me know what you think!

@tshauck
Copy link
Contributor Author

tshauck commented Jun 21, 2024

Interesting... to your later point, you might consider opening a DRAFT PR. And if it's not too much, maybe do an initial one for case 3 then stack the draft of 4 and 5 on it? Case 3 is probably easy to get reviewed and merged, then we can chat about 4 and 5 potentially with additional eyes.

To the actual problem, the difference is obviously in the field qualifiers, so maybe there's a way to keep the right field qualifier from the column and/or update schema after it's done?

@LorrensP-2158466
Copy link
Contributor

Sorry, what do you mean by stack the draft for case 4 and 5 on top of the pr for 3?

@tshauck
Copy link
Contributor Author

tshauck commented Jun 21, 2024

Like make a branch for case 3, open a PR for that, then branch off that one for the changes for cases 4 and 5. You could then rebase if case 3's PR got merged or if it makes more sense, you can merge the 4 and 5 branch into the case 3, then merge that into main... perhaps unnecessarily complicated, vs just two independent branches. Just thinking how to get the easy case merged while working through the more complex case.

@LorrensP-2158466
Copy link
Contributor

Alright, I've never done the rebase thing, it does sound nicer than 2 separate branches, I guess I can try it out now. You'll see the PR for cases 1, 2 and 3 tomorrow.

@LorrensP-2158466
Copy link
Contributor

LorrensP-2158466 commented Jun 26, 2024

So, I have been trying some different ways to make the last 2 cases work, but I can't find a simple, fast way. The problem lies in the fact that we can't project the left/right input with columns from the other input as null with the Projection node because it will check its schema with its input, and if these don't have matching columns, it will fail. I only see 2 other solutions at this point:

  • Leave it as it is; I suspect that the exec join nodes will handle empty join tables very efficiently
  • Create a new node, ProjectionExtension or SchemaExtension, which, given an input node and a set of schemas, will extend the input schema with columns from the extra schemas as NULL. For example, like this:
    Given Input: [a Int8, b Int8, c Int8] and extension: [d Int8, e Int8] the output schema will be:
    [a Int8, b Int8, c Int8, d Int8, e Int8]
    If we execute this node, the values from the input will be collected, and every other value will be null:
Input: [                 Output: [
    [1, 1, 1],          [1, 1, 1, NULL, NULL],
    [2, 2, 2],   ==>    [2, 2, 2, NULL, NULL],
    [2, 2, 2],          [2, 2, 2, NULL, NULL],
]                       ]

But I do have to say, this is a lot of work for this specific use case (I don't know any other atm).

If anyone has any tips or extra bits of knowledge, please share them, Thanks!

Update:
I guess we can also put a flag in the projection node that tells it can extend its input's schema? But that's also changing the API...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
3 participants