Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API in ParquetExec to pass in RowSelections to ParquetExec (enable custom indexes, finer grained pushdown) #9929

Closed
alamb opened this issue Apr 3, 2024 · 15 comments · Fixed by #10813
Assignees
Labels
enhancement New feature or request

Comments

@alamb
Copy link
Contributor

alamb commented Apr 3, 2024

Is your feature request related to a problem or challenge?

We are building / testing a specialized index for data stored in parquet that can tell us what row offsets are needed from the parquet file based on additional infomration

Currently the parquet-rs parquet reader allows specifying this type of information via ArrowReaderBuilder::with_row_selection

However, the DataFusion ParquetExec has no way to pass this information down. It does build its own

Describe the solution you'd like

What I would like is a way to provide something like a RowSelection for each row group

Describe alternatives you've considered

Here is one possible API:

let parquet_selection = ParquetSelection::new()
  // * rows 100-250 from row group 1
  .select(1, RowSelection::from(vec![
    RowSelector::skip(100),
    RowSelector::select(150)
  ]);
  // * rows 50-100 and 200-300 in row group 2
  .select(2, RowSelection::from(vec![
    RowSelector::skip(50),
    RowSelector::select(50),
    RowSelector::skip(100),
    RowSelector::select(100),
  ]);

let parquet_exec = ParquetExec::new(...)
  .with_selection(parquet_selection);

Additional context

No response

@alamb alamb added the enhancement New feature or request label Apr 3, 2024
@alamb
Copy link
Contributor Author

alamb commented Apr 3, 2024

FYI @westonpace this is something we spoke about

FYI @matthewmturner who expressed interest in this: #9899 (comment)

@westonpace
Copy link
Member

westonpace commented Apr 3, 2024

We implemented a (very crude) implementation of this as part of our parquet benchmarking. Sharing here in case it is interesting as a starting point for someone: https://gist.github.com/westonpace/04f33da51931e0a990997d9ac81b623c

P.S. The implementation assumes that indices are ordered

@westonpace
Copy link
Member

The main reason I call it crude is that the "give me rows with these offsets" feels like it should be part of the parquet reader API and it feels like we are somewhat over-utilizing the "skip" API. However, without upstream changes I don't know that one can do much better.

@westonpace
Copy link
Member

Sorry for the stream of consciousness github. Last comment (I hope). Reading through I see that you are suggesting the skip API be used in the exec node as well.

While this works, we have found that a more intuitive API is to pass in an iter of desired offsets instead of RowSelection.

@alamb
Copy link
Contributor Author

alamb commented Apr 3, 2024

Maybe we could have a version of the API like this (which translates into RowSelection) 🤔

let parquet_selection = ParquetSelection::new()
  // * rows 100-250 from row group 1
  .select_range(1, 100..250);
  // * rows 50-100 and 200-300 in row group 2
  .select_range(2, 50..100)
  .select_range(2, 200..300)

@matthewmturner
Copy link
Contributor

The RowSelection api is new to me, going to spend some time reading on it.

@Ted-Jiang
Copy link
Member

Ted-Jiang commented Apr 7, 2024

In my mind RowSelection is a file-level struct and i think there could be multi-files in one parquetExec so ParquetSelection should be multi-files-level right?


btw if user customized index is file level like page-index, i think directly use RowSelection is more easy way 🤔

@alamb
Copy link
Contributor Author

alamb commented Apr 7, 2024

In my mind RowSelection is a file-level struct and i think there could be multi-files in one parquetExec so ParquetSelection should be multi-files-level right?

Yes, you are right. That is an excellent point

btw if user customized index is file level like page-index, i think directly use RowSelection is more easy way 🤔

I think one challenge with using RowSelection is that it is relative to the pages (or maybe the row group), rather than the overall file.

FWIW what I hope to do over the next few weeks is to whip up a little POC showing how one might build a specialized index on top of paruqet files as a Demo and then figure out what types of APIs would be needed in DataFusion

@alamb alamb changed the title API in ParquetExec to pass in RowSelections to ParquetExec API in ParquetExec to pass in RowSelections to ParquetExec (enable custom indexes, finer grained pushdown) Apr 8, 2024
@waynexia
Copy link
Member

I think one challenge with using RowSelection is that it is relative to the pages (or maybe the row group), rather than the overall file.

IIRC the parquet reader works by two levels of selection. First some row group filter selects which group to fetch, and then the row selector tells which row in that row group to read.

It might not be very tricky to translate the "overall file selection" into parquet RowSelector, as ParquetMetadata can tell how many rows a row group contains.

@alamb
Copy link
Contributor Author

alamb commented Apr 23, 2024

Here is an example (from @waynexia on twitter) of using this kind of API: https://twitter.com/wayne17229928/status/1781997834356850945

https://github.com/GreptimeTeam/greptimedb/tree/tantivy-poc

@waynexia
Copy link
Member

Related code is here GreptimeTeam/greptimedb@9e1e4a5#diff-ac43dc13456cf41e4fabb9d577101e245366687d49064aff99bf10aab20b9cd0R429-R480

First, get the precise row number of rows to read (in the file level)

let mut selected_row = applier.apply(file_id).unwrap();

Then translate the file level row number into row group selection:

        // translate `selected_row` into row groups selection
        selected_row.sort_unstable();
        let mut row_groups_selected = BTreeMap::new();
        for row_id in selected_row.iter() {
            let row_group_id = row_id / row_group_size;
            let rg_row_id = row_id % row_group_size;

            row_groups_selected
                .entry(row_group_id)
                .or_insert_with(Vec::new)
                .push(rg_row_id);
        }
        let row_group = row_groups_selected
            .into_iter()
            .map(|(row_group_id, row_ids)| {
                let mut current_row = 0;
                let mut selection = vec![];
                for row_id in row_ids {
                    selection.push(RowSelector::skip(row_id - current_row));
                    selection.push(RowSelector::select(1));
                    current_row = row_id + 1;
                }

                (row_group_id, Some(RowSelection::from(selection)))
            })
            .collect();

The result BTreeMap<usize, Option<RowSelection>> is a map from "the number of row group" to "selection within that row group".

@alamb
Copy link
Contributor Author

alamb commented May 13, 2024

I hope to work on this issue this week

@alamb
Copy link
Contributor Author

alamb commented May 20, 2024

Update here is that I found it was maybe too large a step to get to the row level access initially -- instead I started with a basic example of building a file level index -- #10546 / #10549

Once that is looking good I will make a more "advanced" example of building a row group level index where I can use this API.

I will likely not be able to get to this item this week

@alamb
Copy link
Contributor Author

alamb commented Jun 3, 2024

Update here is I have the API for specifying the selection sketched out here: #10738

@alamb
Copy link
Contributor Author

alamb commented Jun 6, 2024

Here is a PR with the proposed API exposed: #10813

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
5 participants