Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a RecordBatch::split to split large batches into a set of smaller batches #343

Closed
alamb opened this issue May 24, 2021 · 6 comments
Closed
Labels
enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers

Comments

@alamb
Copy link
Contributor

alamb commented May 24, 2021

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
Sometimes it is advantageous to split one large RecordBatch into smaller batches for processing (for example, processing the multiple smaller RecordBatches in parallel)

So instead of 1 RecordBatch with 1M rows, we could have 100 RecordBatches with 10,000 rows each that could be processed in paralle.

@tustvold implemented such a function in https://github.com/apache/arrow-datafusion/pull/379/files

    fn split_batch(sorted: &RecordBatch, batch_size: usize) -> Vec<RecordBatch> {

Describe the solution you'd like
Port the split_batch function into RecordBatch::split(batch_size) or something similar and add appropriate tests

cc @jorgecarleitao @nevi-me

@alamb alamb added good first issue Good for newcomers enhancement Any new improvement worthy of a entry in the changelog labels May 24, 2021
@alamb alamb changed the title Add a RecordBatch::split Add a RecordBatch::split to split large batches into a set of smaller batches May 24, 2021
@Dandandan
Copy link
Contributor

We also have slice which already should make it quite easy to build this functionality (or use it to implement this functionality with).

@alamb
Copy link
Contributor Author

alamb commented May 24, 2021

We also have slice which already should make it quite easy to build this functionality (or use it to implement this functionality with).

Indeed -- I think that is how @tustvold implemented split_batch:

   fn split_batch(sorted: &RecordBatch, batch_size: usize) -> Vec<RecordBatch> {
        let batches = (sorted.num_rows() + batch_size - 1) / batch_size;

        // Split the sorted RecordBatch into multiple
        (0..batches)
            .into_iter()
            .map(|batch_idx| {
                let columns = (0..sorted.num_columns())
                    .map(|column_idx| {
                        let length =
                            batch_size.min(sorted.num_rows() - batch_idx * batch_size);

                        sorted
                            .column(column_idx)
                            .slice(batch_idx * batch_size, length)
                    })
                    .collect();

                RecordBatch::try_new(sorted.schema(), columns).unwrap()
            })
            .collect()
    }

@nevi-me
Copy link
Contributor

nevi-me commented May 30, 2021

Reminder that slice currently doesn't work (or work correctly) for lists. So we have to be careful with how we use it.

It's sadly a limitation that @jorgecarleitao and I have encountered previously.

@alamb
Copy link
Contributor Author

alamb commented May 31, 2021

@nevi-me notes on #381

we would need to account for its individual array offsets, as there is never a guarantee that a record batch has all child arrays starting from the same offset.

@alamb
Copy link
Contributor Author

alamb commented Jun 16, 2021

#460 may be a better plan (RecordBatch::slice())

@alamb
Copy link
Contributor Author

alamb commented Jul 13, 2021

Given we now have slice in #460 I don't think this adds much anymore

@alamb alamb closed this as completed Jul 13, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Any new improvement worthy of a entry in the changelog good first issue Good for newcomers
Projects
None yet
Development

No branches or pull requests

3 participants