Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement streaming versions of Dataframe.collect methods #789

Merged
merged 23 commits into from
Jul 30, 2021

Conversation

andygrove
Copy link
Member

@andygrove andygrove commented Jul 27, 2021

Which issue does this PR close?

Closes #47.

Rationale for this change

In addition to the current collect* methods that load results into memory in a Vec<RecordBatch> this PR adds alternate execute_stream* methods that return streams instead so that results don't have to be loaded into memory before being processed.

What changes are included in this PR?

New execute_stream and execute_stream_partitioned methods on DataFrame.

Are there any user-facing changes?

Yes, new DataFrame methods.

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Jul 27, 2021
@andygrove andygrove self-assigned this Jul 27, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea looks very nice 👍

/// # Ok(())
/// # }
/// ```
async fn collect_stream(&self) -> Result<SendableRecordBatchStream>;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we called this something like execute rather than collect_stream?

    async fn execute_stream(&self) -> Result<SendableRecordBatchStream>;

This would mirror the naming of ExecutionPlan::execute and might make it clearer that collect means collect into a Vec and execute means get a stream

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. I renamed these to execute_stream and execute_stream_partitioned


/// Convert the logical plan represented by this DataFrame into a physical plan and
/// execute it, collecting all resulting batches into memory while maintaining
/// partitioning
async fn collect_partitioned(&self) -> Result<Vec<Vec<RecordBatch>>> {
let state = self.ctx_state.lock().unwrap().clone();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could probably rewrite collect_partitioned to be in terms of collect_stream_partitioned:

collect(self.collect_stream_partitioned().await?)

or something like that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've cleaned the code up and removed a fair bit of duplication now.

@andygrove andygrove changed the title WIP: Implement streaming versions of Dataframe.collect methods Implement streaming versions of Dataframe.collect methods Jul 28, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a really nice change to me

@andygrove andygrove merged commit d637871 into apache:master Jul 30, 2021
@andygrove andygrove deleted the dataframe-collect-stream branch July 30, 2021 17:30
@houqp houqp added the enhancement New feature or request label Jul 31, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DataFrame.collect() should return async stream rather than a Vec<RecordBatch>
3 participants