Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Enable converting filter -> join when an index is available. #2164

Closed
wants to merge 8 commits into from

Conversation

wangandi
Copy link
Contributor

@wangandi wangandi commented Mar 2, 2020

Resolves #1962

[Ignore what was here formerly] Scroll down for the updated description of the PR.


This change is Reviewable

@wangandi wangandi force-pushed the filterjoin branch 2 times, most recently from d845e43 to 72fdaea Compare March 3, 2020 16:39
Comment on lines 121 to 129
Get { materialize.public.bar (u3) },
ArrangeBy { keys: [[]], Constant [["this"]] },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is maybe an example of why we might want ScalarExpr in the join constraints. We want to introduce the constant into the expression, but doing so with a cross join will result in rough behavior.

Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks sane!

I think there are still some things to sort out in join planning, where we should be able to communicate that forming an arrangement of a constant collection is cheap. I don't think we do that at the moment, and I don't know what the negative implications are (possibly that we never create a delta query for such a join plan, because we can never find the collection arranged). From the examples, it looks like we do build delta queries, but probably only because the linear implementation forces an arrangement which the delta query picks up.

Also, per comment at least one of the examples highlights that if we need cross joins to introduce constant values that are subsequently used in indexes, we might be paying a high price in order to get the improved efficiency later in the join.

Comment on lines 147 to 165
#indexes on bar(a), foo(a), and foo(b) exist
query T multiline
explain plan for select foo.a, b, c, d, e from foo, bar where foo.a = bar.a and b = 'this'
----
Project {
outputs: [0 .. 2, 5, 6],
Join {
variables: [[(0, 0), (2, 0)], [(1, 0), (2, 1)]],
implementation: DifferentialLinear,
Get { materialize.public.bar (u3) },
ArrangeBy { keys: [[]], Constant [["this"]] },
ArrangeBy {
keys: [[#0, #1]],
Get { materialize.public.foo (u1) }
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed with Frank, this is not a good plan, and we should figure out some way of remedying it.

This was how the plan was produced:

  • First, the filter to join step noticed that there is a filter on equality of b to 'this', so it changed the filter to a join on the foo(b) index (a.k.a ArrangeBy{keys: [[#1]], Get {...(u1)}) + this.
  • At some point the two joins got fused.
  • Then the join implementation determination step replaced the ArrangeBy{keys: [[#1]], Get {...(u1)} with an ArrangeBy on something that is not already an index.

@wangandi wangandi force-pushed the filterjoin branch 2 times, most recently from f8b589d to 48a8885 Compare June 15, 2020 19:53
@wangandi wangandi marked this pull request as draft June 16, 2020 21:27
@quodlibetor quodlibetor changed the base branch from master to main July 11, 2020 18:29
@wangandi wangandi force-pushed the filterjoin branch 2 times, most recently from 7c3ce83 to 482d0fc Compare July 23, 2020 22:11
@wangandi wangandi marked this pull request as ready for review July 23, 2020 22:14
@wangandi
Copy link
Contributor Author

wangandi commented Jul 23, 2020

Description of new version of PR:

A filter to gets converted into semijoin if the filter is on <column(s) for which an index exists>=<literal(s)>

In the interest of reducing the size of the PR, I've decided to limit the scope of this PR so:

  • If filter has been converted into a semijoin, and but then the semijoin gets fused with other joins, the semijoin essentially gets converted back into a filter. The next PR will handling the interaction between the semijoin and joins, and allow delta/differential joins to perform semijoins in the middle if able. I have already added tests for interactions of this transform with join to test/sqllogictest/transform/index_planning.slt (formerly test/sqllogictest/index_planning.slt).
  • The semijoin conversion only happens when the index is on a ScalarExpr::Column as opposed to an arbitrary expression. The main motivation for this limitation is that JoinImplementation does not seem to handle joining on expressions yet. For example if you have an index on foo(mod(a, 5)) and on bar(mod(a,5)), you would expect a DeltaQuery, but instead you get:
  explain plan for select foo.a, b, c, d, e from foo, bar where mod(foo.a, 5) = mod(bar.a, 5)
  ----
  %0 =
  | Get materialize.public.foo (u1)
  | ArrangeBy ()

  %1 =
  | Get materialize.public.bar (u3)

  %2 =
  | Join %0 %1 (= (#0 % 5) (#3 % 5))
  | | implementation = Differential %1 %0.()
  | | demand = (#0..#2, #4, #5)
  | Project (#0..#2, #4, #5) 

Removing this limitation will be part of a third PR and will depend on #3044.

Minor changes:

  • I realized that there exists the folder test/sqllogictest/transform, so I moved *planning.slt into that folder.
  • Moved filter and constant rendering to their own files so that they can be called by delta_join.rs/join.rs
  • Commit 1 is just removing the long-unused FilterLifting transform. I recommend reviewing the commits individually.

Comment on lines -190 to -192
if let RelationExpr::ArrangeBy { .. } = inputs[index] {
// do nothing. We do not want to push down a filter and block
// usage of an index
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is ok to remove because join planning is capable of lifting filters out of the way in order to use an arrangement.

Also, removing this prevents filters not involved in the semijoin from becoming equivalence classes in the join.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this prevents filters not involved in the semijoin from becoming equivalence classes in the join.

I don't understand this!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take this scenario:

  1. There is an index on foo(a).
  2. the query is SELECT b, c FROM foo WHERE a = 5 and b = 'this'

The FilterEqualLiteral changes the filter on column a to a join, so the RelationExpr looks roughly like this:

%0 =
| Get materialize.public.foo (u1)
| ArrangeBy (#0)

%1 =
| Join %0 (= #0 5)
| Filter (#1 = "this")
| Project (#1, #2)

The logic PredicatePushdown for Filter{Join{...}} works like this:

  1. Try to localize the predicate to a single input
  2. If the predicate localization fails, then push the predicate into the equivalence class.

If I don't delete these lines, localizing Filter(#1 = this) automatically fails, so the predicate gets pushed into the equivalence class of the Join like this.

%0 =
| Get materialize.public.foo (u1)
| ArrangeBy (#0)

%1 =
| Join %0 (= #0 5) (#1 = "this")
| Project (#1, #2)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I still don't understand. The plan you have looks great to me. Is the problem that the semijoin implementation doesn't handle additional filters?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better example of what happens if the lines are not removed. Equality filters that don't have a literal on one side get pushed into the join, and then their presence messes up join implementation planning.

# indexes exist on foo(b) and foo(a, b, c)
query T multiline
EXPLAIN PLAN FOR SELECT a FROM foo WHERE b = 'it' and a = c
----
%0 =
| Get materialize.public.foo (u1)
| ArrangeBy (#0, #1, #2)

%1 =
| Join %0 (= #0 #2) (= #1 "it")
| | implementation = Differential %0.(#0, #1, #2)
| | demand = (#0)
| Project (#0)

EOF
# index exists on foo(b)
query T multiline
EXPLAIN PLAN FOR SELECT a, c FROM foo WHERE b = 'it' and a = c
----
%0 =
| Get materialize.public.foo (u1)

%1 =
| Join %0 (= #0 #2) (= #1 "it")
| | implementation = Differential %0
| | demand = (#0)
| Project (#0)

EOF

@@ -277,4 +277,73 @@ where
panic!("render_join called on invalid expression.")
}
}

pub fn render_semi_join(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments here would help too! We might want to change the name; DD's semijoin is a generalization of the relational algebra semijoin to multisets, but I suspect most folks who use semijoins with SQL probably don't expect that.

@frankmcsherry
Copy link
Contributor

frankmcsherry commented Jul 24, 2020

Comments spilling out, but: can you explain the motivation for semijoins over standard binary joins? Internally in differential, a semijoin is rendered as a standard binary join, just with a special arrangement of the right-hand data (more memory efficient). It feels like if the benefit is that alone it may be more complexity than is needed, but if there is another reason I'm all ears.

Comment on lines 302 to 305
return e.iter().find_map(|s| match s {
ScalarExpr::Literal(_, _) => Some(s.eval(&[], &temp_storage)),
_ => None,
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't read this. I don't know where the return returns to. Some comments, or different control flow would help me!

Also, you may want ScalarExpr's function

    pub fn as_literal(&self) -> Option<Result<Datum, &EvalError>> {
        if let ScalarExpr::Literal(lit, _column_type) = self {
            Some(lit.as_ref().map(|row| row.unpack_first()))
        } else {
            None
        }
    }

Comment on lines 1155 to 1162
/// Perform a semi join of a single input to a constant literal
///
/// Technically, both differential and delta query implementations will
/// also perform semijoins if the semijoin would shrink the size of the
/// arranged inputs. A join is designated as having this implementation
/// in the single input case where neither `Differential` nor `DeltaQuery`
/// make sense.
Semijoin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding of the Differential plan is that it specifies for each input which arrangement to use. Is there a reason that the Semijoin variant shouldn't just be the Differential variant with the appropriate arrangements?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Differential specifies which arrangement to use for each input except for the first one:

    /// The first argument indicates the index of the starting collection, and
    /// the sequence that follows lists other relation indexes, and the key for
    /// the arrangement we should use when joining it in.
    ///
    /// Each collection index should occur exactly once, either in the first
    /// position or somewhere in the list.
    Differential(usize, Vec<(usize, Vec<ScalarExpr>)>)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but couldn't you use the constant collection as the first argument, and the specific arrangement you want to select from as the second argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had thought that you wanted the constants in the join constraints (see your Mar 4 comment above). Since the constants are in the join constraints, there is no constant collection, and often there is only one input.

Comment on lines 309 to 312
if let RelationExpr::ArrangeBy {
input: inner_input,
keys,
} = &inputs[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaict, if this test does not pass, we do nothing. Probably we should explicitly complain? Surely it doesn't work correctly on any other input?

Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can land these changes if you feel strongly, but I think they introduce rather than pay down complexity. The semijoin operator seems like special case of the general Differential join, and while I can imagine that it doesn't yet do what you need, I think we are going to need to make it do so eventually.

More generally, I don't have a lot of clarity here about the required properties to enable an indexed look-up. I would imagine we want to hit a state where we have e.g. Differential with predicates in it, and enough information to perform an implementation that introduces constants whenever needed (e.g., each input, including the first, could have an Option<Vec<ScalarExpr>> to indicate either the intended arrangement, or the absence of a preference (which would allow us to push filtering and demand information at it). I think that would also allow you to plan look-ups from equality constraints, and also filter with the remaining constraints.

I see that you have further PRs planned, but I can't tell if they unify implementations, or not.

@wangandi
Copy link
Contributor Author

More generally, I don't have a lot of clarity here about the required properties to enable an indexed look-up. I would imagine we want to hit a state where we have e.g. Differential with predicates in it, and enough information to perform an implementation that introduces constants whenever needed (e.g., each input, including the first, could have an Option<Vec> to indicate either the intended arrangement, or the absence of a preference (which would allow us to push filtering and demand information at it). I think that would also allow you to plan look-ups from equality constraints, and also filter with the remaining constraints.

Ah, upon thinking about this some more, I realize I had been under the false impression that there does not exist a 2+ input case where we would choose the Differential implementation and have a pre-existing arrangement on the first input. Thus, I had the impression that instead of adding a special case to the differential code, it would be cleaner to put it in its own method.

But I guess we could have a join like

select * 
from a, b, c
where a.col1 = b.col1  
  and b.col2 = c.col1

where the existing indexes are on a(col1), b(col1), c(col1) , so it is missing the index on b(col2) in order for delta query to work.

@wangandi
Copy link
Contributor Author

Regarding the required properties for an indexed look-up, I should add this to comments in the transformations, but here's how I think about it:

  1. For an index to be used in a look up, you need there to be equality predicates on a literal for all columns of the index. So to use an index on a(col1, col2), you need there to be the predicate a.col1 = lit1 and a.col2 = lit2.
  2. For a single input join, you can only have one index, so pick the index that encompasses the most predicates.
  3. For a multi input join, I consider reducing work on the join is more important than making use of filters. My original plan was to just ignore any filters in making a join plan, and then put predicates back in if the join plan involves an index that the predicates could make use of. But upon further thought, there's cases like
select * 
from a, b, c
where a.col1 = b.col1  
  and b.col2 = c.col1
  and c.col2 = a.col2

where if your available indexes are a(col1), b(col2), and c(col2), if you have a predicate c.col2 = 5, that would influence the particular Differential plan order. So I should add presence of filters to the characteristics either in this PR or the next.

@wangandi
Copy link
Contributor Author

wangandi commented Aug 4, 2020

Comments spilling out, but: can you explain the motivation for semijoins over standard binary joins? Internally in differential, a semijoin is rendered as a standard binary join, just with a special arrangement of the right-hand data (more memory efficient). It feels like if the benefit is that alone it may be more complexity than is needed, but if there is another reason I'm all ears.

Suppose you have a join like

select * from a, b where a.col1 = b.col1 and a.col1 = 5

and you have an index on a(col1).

The query would be rendered roughly like: Join{b , Join{Constant(5), ArrangeBy{a, [col1]}}}

If you reuse the standard binary join in Differential join to do the inner join, the result of the inner join would be a collection, and you would need to add an additional map and an additional arrangeby operator to get it into shape for the outer join. Whereas if you use semijoin for the inner join, it would automatically result in an arrangement on the right column.

@wangandi
Copy link
Contributor Author

Changelog:

  • Filters on indexes work in conjunction with both differential and delta joins.
  • If there are multiple indexes that we could use in conjunction with filters, FilterEqualLiteral puts all the filters into the equivalences of the join and relies on JoinImplementation which indexes are actually used. Suppose that an input has two arrangements, one on (#0) and one on (TODO #1), and we desire to run two filters #0=x and #1 = y. Previously, FilterEqualLiteral would pick to convert one of the filters into a join. Now, if the input is part of a DeltaQuery, and both arrangements are needed for the delta query, both filters will get converted to a join.
  • The rules for determining an arrangement for the starting input for differential join are now:
  1. If there is an arrangement that lines up with the arrangement for the second input, use that arrangement.
  2. Otherwise, use the largest index for which there exist a set of filters that can make use of it. (this is new)
  3. Otherwise use no index

Better viewed with hide whitespace changes turned on.
Ran full SLT to ensure that I would not be breaking correctness on main again.

TODO in a future commit:

  • Push back down predicates if, due to the decision of JoinImplementation, we did not want to convert the filter to a join.

Copy link
Contributor

@justinj justinj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I missed a lot of the context on this change, so forgive me if I'm retreading ground. The idea seems to be: when we have an expression that looks like (Filter x p), where p is a conjunction of equalities between columns are constants, sometimes we will happen to have an arrangement sitting around of x, arranged by one or more of the columns referenced by p. In such a case, it's valuable to re-use that arrangement and rewrite (Filter x p) as (Filter (Join (ArrangeBy x ...) r p') p'') where p' is the set of equalities between x, and p''is the set of remaining conjuncts.

At any rate, I'm a little iffy on when/where/why explicit ArrangeBys exist, and based on conversations it sounds like we don't have a particularly well-defined answer for that, so that's something I would probably want to make more explicit for something like this? Not sure if there's sufficient prior art, though.

) -> (Characteristics, Vec<ScalarExpr>, usize) {
// best start characteristic: an arrangement that lines up with the keys of
// the second input
if let Some((_, key, second)) = self.order.get(0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't understand this code. Maybe I'm in the minority here, but I feel like all the code like this in transform really needs some higher level abstractions to make it more comprehensible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, I think this is not a problem with this code in particular, but this pattern that shows up everywhere, I just don't know how to understand it!

@@ -808,4 +771,100 @@ impl<'a> Orderer<'a> {
}
}
}

fn find_start_characteristics(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this function have a comment that explains at a high-level what it's doing/what the various input/outputs are?

// find set of keys of the largest size that is a subset of columns
let best_index = key_set
// find all sets of keys for which every column
// corresponds to an equality to a literal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: i think some punctuation in this comment would make it easier to parse, since there's two distinct sentences here

let mut support_iter = support.iter();
if let Some(first) = support_iter.next() {
let input = input_relation[*first];
if support_iter.all(|col| input_relation[*col] == input) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could this be simplified a bit by having a bound_by function or something that checks if a given expression is bound entirely by one of the join inputs (I think that's what this is doing?)?

@wangandi
Copy link
Contributor Author

At any rate, I'm a little iffy on when/where/why explicit ArrangeBys exist, and based on conversations it sounds like we don't have a particularly well-defined answer for that, so that's something I would probably want to make more explicit for something like this? Not sure if there's sufficient prior art, though.

Now that I think about it, the best reason for why explicit ArrangeBys exists is that if you create it an index, the dataflow layer gets a command to render ArrangeBy{Get{...}, keys}.

@frankmcsherry
Copy link
Contributor

frankmcsherry commented Oct 1, 2020

At any rate, I'm a little iffy on when/where/why explicit ArrangeBys exist, and based on conversations it sounds like we don't have a particularly well-defined answer for that, so that's something I would probably want to make more explicit for something like this?

I think this is something we could try to articulate more clearly. There are a few things that ArrangeBy does, and maybe if we are clear about which of them are the intended uses it might free us up a bit.

  1. The main thing that ArrangeBy does is force the arrangement of a collection by some keys, and make that arrangement available for others. It is (or should be, I think) a blocker for certain transformations like predicate pushdown, projection pushdown, and maybe map lifting, among others. Unfortunately, arrangement re-use is not currently expressed through a Let, Get style binding, so it is hard to know what the downside of tweaking the innards of an arrangement (will it cause several re-use hits to vanish; who knows?).

  2. It presents our expectation about used arrangements outward, so that explain plan and such gets a better view on where arrangements are used. I think this is the primary reason they get introduced at the moment around the join code, but @wangandi should correct me. This may be less critical as join implementations indicate (inelegantly, perhaps) how they plan to arrange data. This is also a bit awkward because ArrangeBy exists even when no new arrangement needs to be formed (e.g. after a Reduce).

  3. It locks in some known arrangements so that re-optimization can take advantage of them. This happens with join planning, where a different join may result in some input arrangements being formed, and with them formed a delta join now becomes possible and preferable. By leaving the ArrangeBy in the plan, we can see that this is planned to happen, even if there is no re-use otherwise, nor any humans who are looking at the plan.

All that being said, the above is descriptive rather than prescriptive. We didn't start with the above and then start putting in ArrangeBy is my understanding (I certainly didn't).

Each of the reasons above seems non-terrible. Number 2 isn't something I'm passionate about, and maybe if that use vanished we might have fewer of them around. Especially if they are blocking other optimizations, or complicating anything.

Comment on lines +78 to +83
// filter arrangements using semijoins if possible
let mut equivalences = equivalences.clone();
for equivalence in equivalences.iter_mut() {
equivalence.sort();
equivalence.dedup();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on how the comment relates to the code. This looks to be normalizing equivalances more than anything else?

Comment on lines +85 to +87
// if possible, shorten input arrangements by joining them with a
// constant row. Create a map to look up the shortened
// arrangements to use
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused as to what is about to happen here. My understanding of delta joins is that they are invoked when we have all existing arrangements and do not need to create new ones. This description sounds like it is proposing introducing some new joins for the purposes of filtering, which looks like it will create a new arrangement in its implementation.

/// size 1.
/// Return the RelationExpr, if any, that was rendered and the constants
/// in the rendered RelationExpr
pub fn filter_on_index_if_able(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method worries me a lot. Here are some thoughts:

  1. This looks like it is starting to introduce bushy tree joins, instead of linear joins. Nothing fundamentally bad about bushy tree joins, but I don't think render.rs is the right place to place the brains about when and how to restructure join plans. I'd 100% like to start with linear plans for this PR if we can. If we cannot, I'd really like to have that spelled out somewhere.

  2. This doesn't result in an arrangement for the result RelationExpr as far as I can tell, which seems odd as elsewhere (delta joins at least) the code appears to panic if we don't have the arrangement available. Or perhaps it silently makes the arrangement; I'm not sure which at the moment. But that arrangement is a potential cost for implementing the filtering this way, and isn't a clear win that could entitle render.rs to make that change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: 1)
We could avoid bushy tree joins in RelationExpr by having JoinImplementation pushing filterjoins down to the inputs.

So instead of

Join {
  inputs: [Get{a}, Get{b}],
  equivalences: [[#0, #2, 'hello']]
}

the RelationExpr could become

Join{ 
  inputs: [
    Join{inputs:[Get{a}], equivalences: [[#0, 'hello']]},
    Join{inputs:[Get{b}], equivalences: [[#2, 'hello']]},
  ],
  equivalences: [#0, #2],
}

Re: 2)
This does result in an arrangement for the resulting RelationExpr. (see lines 499-501, 504-506)

 let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
                         let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
                         self.set_local(&join_expr, keys, (result, es));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: 2, sorry! I'm still a bit confused because delta joins may expect multiple arrangements, by specific keys, and the arranged_named doesn't seem to do that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the scarier part of 2 for me is that creating a new arrangement that the join did not ask for is a big responsibility, and we might prefer to make this clearer.

Copy link
Contributor Author

@wangandi wangandi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused as to what is about to happen here. My understanding of delta joins is that they are invoked when we have all existing arrangements and do not need to create new ones. This description sounds like it is proposing introducing some new joins for the purposes of filtering, which looks like it will create a new arrangement in its implementation.

I think the fundamental question for this PR is what takes precedence in these cases:

  1. select * from foo, bar where foo.a = bar.a and foo.b = 5, indexes exist for foo(a), foo(b), and bar(a).
    I have in this PR that the join always takes precedence over the filter in this case so the resulting plan is
%0
| Get foo

%1
| Get bar

%2
| Join %0 %1 (=#0 #2) 
| | DeltaQuery %0 %1(#0)| %1 %0(#0) 
| Filter(#1 = 5)
  1. select * from foo, bar where foo.a = bar.a and foo.a = 5 and indexes on foo(a) and bar(a) exist.
    I have in this PR that the filter does get turned into a join, and then the result of the filter is arranged for use in the join. So this results in a bushy tree style join, which would look like this if the actions in the render step were stated explicitly.
%0
| Get foo

%1
| Get bar

%2
| Join %0 (=#0 5) 
| | Differential %0(#0)
| ArrangeBy(#0) 

%3
| Join %1 (=#0 5)
| | Differential %1(#0)
| ArrangeBy(#0) 

%4
| Join %0 %1 (= #0 #2) 
| | DeltaQuery %0 %1(#0)| %1 %0(#0) 
  1. select * from foo, bar where foo.a = bar.a and foo.a = 5 and only an index on foo(a) exists.
    Currently in this PR, the same precedence rule as (2) applies, so the plan looks like this if the bushy-style join in the render pipeline were explicitly expressed in the RelationExpr:
%0
| Get foo

%1
| Get bar

%2
| Join %0 (=#0 5) 
| | Differential %0(#0)
| ArrangeBy(#0) 

%3
| Join %1 %2 (= #0 #2) 
| | Differential %1 %2(#0) 

Now that I look at it, the plans for (2) and (3) are silly since if we are joining Filter(Get(foo), a=5) on a Filter(Get(bar), a=5) on foo.a = bar.a, it effectively becomes a cross join. It might be a less silly structure, though, if we expand the filter->join to include filters like (foo.a = 5 or foo.a = 6).

/// size 1.
/// Return the RelationExpr, if any, that was rendered and the constants
/// in the rendered RelationExpr
pub fn filter_on_index_if_able(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re: 1)
We could avoid bushy tree joins in RelationExpr by having JoinImplementation pushing filterjoins down to the inputs.

So instead of

Join {
  inputs: [Get{a}, Get{b}],
  equivalences: [[#0, #2, 'hello']]
}

the RelationExpr could become

Join{ 
  inputs: [
    Join{inputs:[Get{a}], equivalences: [[#0, 'hello']]},
    Join{inputs:[Get{b}], equivalences: [[#2, 'hello']]},
  ],
  equivalences: [#0, #2],
}

Re: 2)
This does result in an arrangement for the resulting RelationExpr. (see lines 499-501, 504-506)

 let result = oks.semijoin(&constant_collection).arrange_named("Semijoin");
                         let es = errs.concat(&es.as_collection(|k, _v| k.clone())).arrange();
                         self.set_local(&join_expr, keys, (result, es));

@wangandi
Copy link
Contributor Author

wangandi commented Oct 1, 2020

(4) Another case: select * from foo, bar, baz where foo.a = bar.a and foo.a = 5 and bar.b = baz.b. Indexes exist for foo(a), bar(a), bar(b), baz(b).
The way the current PR is written, the three input join becomes a DeltaQuery using the arrangements ArrangeBy{Filter{Get(foo), a = 5}, a}, ArrangeBy{Filter{Get(bar), a = 5}, a}, ArrangeBy{Get(bar), b}, ArrangeBy{Get(baz), b}.

Would it be preferable for the join to be instead planned as

%0
| Get bar
| ArrangeBy(#1)

%1
| Get baz
| ArrangeBy(#1)

%2
| Join %0 %1 (= #1 #3) 
| | DeltaQuery %0 %1(#1) | %1 %0 (#1) 
| Filter (#0 = 5)

%3
| Get foo
| ArrangeBy(#0) 

%4
| Join %3 (= #0 5)
| | Differential %3 (#0) 

%5
| Join %2 %4 (= #0 #4) 
| | Differential %2 %4()

@frankmcsherry
Copy link
Contributor

My main ask for clarity is that we migrate transformations in join structure from render up and in to the plan itself. I find it very appealing to distinguish between

Join {
  inputs: [Get{a}, Get{b}],
  equivalences: [[#0, #2, 'hello']]
}

and

Join{ 
  inputs: [
    Join{inputs:[Get{a}], equivalences: [[#0, 'hello']]},
    Join{inputs:[Get{b}], equivalences: [[#2, 'hello']]},
  ],
  equivalences: [#0, #2],
}

where the latter is a clearer indication that we should use a specific indexed join with each of the inputs. I would really like to keep these sort of decisions out of the renderer, if at all possible, and just have them provided. Similarly with delta joins: that implementation plan has historically corresponded to a particular dataflow implementation, and if we want to vary from that (e.g. by filtering an input using a semijoin) we are able to express that in RelationExpr and doing so makes the render's life easier.

@wangandi
Copy link
Contributor Author

Superceded by #4524

@wangandi wangandi closed this Oct 26, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Convert filters on index fields into joins
3 participants