Replies: 2 comments 2 replies
-
I think you could do what you describe with futures::stream::flatten (to combine all the partition stream into a single stream) and then https://docs.rs/futures/0.3.28/futures/stream/trait.StreamExt.html#method.flatten https://docs.rs/futures/0.3.28/futures/stream/trait.StreamExt.html#method.for_each Something like this (untested): let streams = df.execute_stream_partitioned().await?
// combine all streams into a single logical one
let stream = futures::stream::iter(streams).flatten();
// Call the method for each record batch seen:
stream.for_each(|batch| {
// your per-batch callback here...
}).await |
Beta Was this translation helpful? Give feedback.
-
Actually, if you want the values from each partition as they come in you may need to use https://docs.rs/futures/0.3.28/futures/stream/trait.StreamExt.html#method.flatten_unordered |
Beta Was this translation helpful? Give feedback.
-
I'm trying to query some Parquet files on S3, and use Datafusion's partitioned streams API to pass all the record batches as they're available to a callback function like below:
However, what I'd really like to do, is get a stream of rows across all threads that match a query and pass them to the callback function as soon as they're found (so the callback will get called every time a new row matches with the row data instead of waiting for the partition finish collecting rows).
Is this possible and if so how can I do this using Datafusion? From stepping into the execute_stream_partitioned() function it looks like I may have to make my own physical plan and plug it into Datafusion, but that feels like a lot of work for a Rust/Datafusion newbie so hoping there's an easier way/API I can hook into to do this.
Thanks!
Beta Was this translation helpful? Give feedback.
All reactions