Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use MapFilterProject and arrangement keys for fast path queries #4649

Merged
merged 7 commits into from Oct 29, 2020

Conversation

frankmcsherry
Copy link
Contributor

@frankmcsherry frankmcsherry commented Oct 28, 2020

Our fast path queries are able to directly read values out of arrangements when the query has a certain "easy" structure.

This PR broadens that structure to include arbitrary map, filter, and project statements around a Get for a materialized arrangement. Additionally, the filter statements are used to drive index selection, where we attempt to choose an index whose keys are all constrained to be literal values, allowing us to seek directly to the key in each worker (and in some glorious future, not even bother the workers we know will not have the key).

Some additional performance changes were made to how the worker reads results out of arrangements. They are generally improvements, but can be a performance regression for ORDER BY .. LIMIT <large> from a large collection. Mainly, we no longer extract all records and then quick-select them, but rather maintain something like a priority queue, performing amortized per-record work logarithmic in the limit. For large limits this can be worse, but for small limits we avoid allocations proportional to the number of records in the arrangement. If the PR lands we should monitor this to see if the regressions in the uncommon case (large limit) are something we should improve.


This change is Reviewable

fixes #4579

@frankmcsherry
Copy link
Contributor Author

The rough performance characteristics here are, on a 700MB file loaded on one worker:

Selecting on a column that is not indexed:

materialize=> select * from data where column3 = '2018-01-01 00:21:05';
...
Time: 2371.181 ms (00:02.371)

Selecting on a column that is indexed:

materialize=> select * from data where column2 = '2018-01-01 00:21:05';
...
Time: 1.081 ms

Copy link
Contributor

@justinj justinj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, if I execute a query like SELECT * FROM foo WHERE p, and p is not able to allow for a seek into an index, will we now read the entire contents of foo into the coordinator (as opposed to building a dataflow that did the filtering)? If yes, is that a problem?

Reviewed 2 of 2 files at r1, 1 of 1 files at r2, 3 of 3 files at r3, 3 of 4 files at r4.
Reviewable status: 3 of 4 files reviewed, 5 unresolved discussions (waiting on @frankmcsherry, @justinj, and @wangandi)


src/coord/src/coord.rs, line 1840 at r4 (raw file):

                        .iter()
                        .map(|(id, exprs)| {
                            let literal_row = map_filter_project.literal_constraints(exprs);

Should this extract the literals and leave the residual MFP so we don't double-evaluate the filters? Probably not worth the complexity right now?


src/dataflow/src/server.rs, line 934 at r3 (raw file):

        // to leave the data as `[Datum]` and use this representation to
        // maintain the top so-many results. We would love to use a heap
        // to maintain this efficiently, but Rust makes this tricky.

nit: Can you expand on this a bit? Or maybe make it more precise? I think it raises more questions than it answers as it is


src/dataflow/src/server.rs, line 940 at r3 (raw file):

        // `order_by` field. Further limiting will happen when the results
        // are collected, so we don't need to have exactly this many results,
        // just at least the results.

nit: s/at least the/at least this many/?


src/dataflow/src/server.rs, line 950 at r3 (raw file):

                if let Some(result) = self
                    .map_filter_project
                    .evaluate(&mut datums, &arena, &mut row_packer)

For large result sets are we at all concerned about reusing the arena here rather than clearing it after each row?


src/dataflow/src/server.rs, line 967 at r3 (raw file):

                    }

                    // TODO: In and ORDER BY .. LIMIT .. setting, once we have a full output

nit: In an

@frankmcsherry
Copy link
Contributor Author

  1. No we should not. The MFP will get pushed to the workers, who apply it before returning results.

Should this extract the literals and leave the residual MFP so we don't double-evaluate the filters? Probably not worth the complexity right now?

It's a good idea. I'm glad I didn't do it in another case, because I got the key selection logic wrong (too many keys considered), but I think probably yes we could / should do that. Lemme ponder whether it is too clever.

For large result sets are we at all concerned about reusing the arena here rather than clearing it after each row?

I wasn't, but it's a good question. I'm not sure where we draw the line between "per record", "per collection", and "unbounded over time". We've distinguished between the first and the last before, but I don't know that we've thought hard about the middle one. I suppose ditching on each record is the safest.

Comment on lines 939 to 940
// maintain the top so-many results. We would love to use a heap
// to maintain this efficiently, but Rust makes this tricky.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't we be able to use a BTreeMap as the heap?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Afaik, it will only sort things by their Ord implementation, whereas we have a custom ordering method.

@frankmcsherry
Copy link
Contributor Author

Should this extract the literals and leave the residual MFP so we don't double-evaluate the filters? Probably not worth the complexity right now?

On further consideration, this is non-trivial in that we don't want to remove the predicate until we decide which one to use, and that doesn't have a clear mechanism for feeding back.

For large result sets are we at all concerned about reusing the arena here rather than clearing it after each row?

Being conservative is still probably best for now, but another case where we might want to keep the arena around would be if we wanted to maintain a Vec<Datum<'a>> instead of rows, to make life easier for the continually reordering for limits. In this case, the arena would need to live as long as those borrows, and that could make continually purging it more complicated (probably not impossible, just we might need to pivot the control flow to present the flushing as the root of one of the loops).

@frankmcsherry
Copy link
Contributor Author

@justinj I shortened the lifetime of the arena, though it had an annoying effect on some other buffer re-use (effectively, there is none as they cannot outlive the arena, from which they might borrow). But, it seems like avoiding tanking memory use when someone has a mysterious arena-using expression is better than avoiding transient allocations, until we think harder about how to avoid both.

@justinj
Copy link
Contributor

justinj commented Oct 29, 2020

LGTM!

@frankmcsherry frankmcsherry merged commit a8d8743 into MaterializeInc:main Oct 29, 2020
@frankmcsherry frankmcsherry deleted the mfp_peeks branch March 8, 2022 13:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Use MapFilterProject and indexed lookups for "fast path" SELECT queries
3 participants