diff --git a/datafusion/datasource-parquet/src/opener.rs b/datafusion/datasource-parquet/src/opener.rs index dcecce51286c..77bb855ddf8c 100644 --- a/datafusion/datasource-parquet/src/opener.rs +++ b/datafusion/datasource-parquet/src/opener.rs @@ -26,10 +26,11 @@ use crate::{ }; use arrow::array::{RecordBatch, RecordBatchOptions}; use arrow::datatypes::DataType; -use datafusion_common::internal_datafusion_err; use datafusion_common::internal_err; use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener}; -use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer}; +use datafusion_datasource::morsel::{ + Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner, +}; use datafusion_physical_expr::projection::{ProjectionExprs, Projector}; use datafusion_physical_expr::utils::reassign_expr_columns; use datafusion_physical_expr_adapter::replace_columns_with_literals; @@ -39,8 +40,6 @@ use std::future::Future; use std::mem; use std::pin::Pin; use std::sync::Arc; -use std::sync::mpsc; -use std::sync::mpsc::{Receiver, TryRecvError}; use std::task::{Context, Poll}; use arrow::datatypes::{Schema, SchemaRef, TimeUnit}; @@ -430,8 +429,8 @@ impl ParquetOpenState { /// /// Implements state machine described in [`ParquetOpenState`] struct ParquetOpenFuture { - planner: Box, - pending_io: Option>>, + ready_planners: VecDeque>, + pending_io: VecDeque, ready_morsels: VecDeque>, } @@ -441,8 +440,8 @@ impl ParquetOpenFuture { partitioned_file: PartitionedFile, ) -> Result { Ok(Self { - planner: morselizer.plan_file(partitioned_file)?, - pending_io: None, + ready_planners: vec![morselizer.plan_file(partitioned_file)?].into(), + pending_io: VecDeque::new(), ready_morsels: VecDeque::new(), }) } @@ -453,10 +452,18 @@ impl Future for ParquetOpenFuture { fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { - // If waiting on IO, poll - if let Some(io_future) = self.pending_io.as_mut() { - ready!(io_future.poll_unpin(cx))?; - self.pending_io = None; + // If waiting on IO, poll the oldest blocked continuation until it + // yields the next CPU-ready planner. + if let Some(mut io_future) = self.pending_io.pop_front() { + match io_future.poll_unpin(cx) { + Poll::Pending => { + self.pending_io.push_front(io_future); + } + Poll::Ready(Ok(planner)) => { + self.ready_planners.push_back(planner); + } + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + } } // have a morsel ready to go, return that @@ -464,8 +471,18 @@ impl Future for ParquetOpenFuture { return Poll::Ready(Ok(morsel.into_stream())); } + // If all remaining work is blocked on I/O, wait for it to yield the + // next planner rather than incorrectly reporting an empty stream. + if self.ready_planners.is_empty() && !self.pending_io.is_empty() { + return Poll::Pending; + } + // Planner did not produce any stream (for example, it pruned the entire file) - let Some(mut plan) = self.planner.plan()? else { + let Some(planner) = self.ready_planners.pop_front() else { + return Poll::Ready(Ok(futures::stream::empty().boxed())); + }; + + let Some(mut plan) = planner.plan()? else { return Poll::Ready(Ok(futures::stream::empty().boxed())); }; @@ -477,9 +494,17 @@ impl Future for ParquetOpenFuture { } self.ready_morsels = plan.take_morsels().into(); - if let Some(io_future) = plan.take_io_future() { - self.pending_io = Some(io_future); + self.pending_io.push_back(io_future); + } + + if self.ready_morsels.is_empty() + && self.ready_planners.is_empty() + && self.pending_io.is_empty() + { + return Poll::Ready(internal_err!( + "planner returned an empty morsel plan" + )); } } } @@ -510,38 +535,13 @@ impl Morsel for ParquetStreamMorsel { } /// Stateful planner for opening a single parquet file via the morsel APIs. -enum ParquetMorselPlanner { - /// Ready to perform CPU-only planning work. - Ready(ParquetOpenState), - /// Waiting for an I/O future to produce the next planner state. - /// - /// Callers must not call [`MorselPlanner::plan`] again until the - /// corresponding I/O future has completed and its result is ready to - /// receive from the channel. - /// - /// Doing so is a protocol violation and transitions the planner to - /// [`ParquetMorselPlanner::Errored`]. - Waiting(Receiver>), - /// Actively planning (this state should be replaced by end of the call to plan) - Planning, - /// An earlier planning attempt returned an error. - Errored, -} +struct ParquetMorselPlanner(ParquetOpenState); impl fmt::Debug for ParquetMorselPlanner { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Self::Ready(state) => f - .debug_tuple("ParquetMorselPlanner::Ready") - .field(state) - .finish(), - Self::Waiting(_) => f - .debug_tuple("ParquetMorselPlanner::Waiting") - .field(&"") - .finish(), - Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(), - Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(), - } + f.debug_tuple("ParquetMorselPlanner") + .field(&self.0) + .finish() } } @@ -557,107 +557,66 @@ impl ParquetMorselPlanner { let state = ParquetOpenState::Start { prepared: Box::new(prepared), }; - Ok(Self::Ready(state)) + Ok(Self(state)) } /// Schedule an I/O future that resolves to the planner's next owned state. /// /// This helper /// - /// 1. creates a channel to send the next [`ParquetOpenState`] back to the - /// planner once the I/O future completes, + /// 1. packages the next [`ParquetOpenState`] into a future that yields the + /// next CPU-ready planner once the I/O future completes, /// - /// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`] + /// 2. moves the blocked continuation into that future rather than keeping a + /// `Waiting` planner state around, /// /// 3. returns a [`MorselPlan`] containing the boxed I/O future for the /// caller to poll. /// - fn schedule_io(&mut self, future: F) -> MorselPlan + fn schedule_io(future: F) -> MorselPlan where F: Future> + Send + 'static, { - let (output_for_future, output) = mpsc::channel(); - let io_future = async move { - let next_state = future.await?; - output_for_future.send(Ok(next_state)).map_err(|e| { - DataFusionError::Execution(format!("failed to send planner output: {e}")) - })?; - Ok(()) - } - .boxed(); - *self = ParquetMorselPlanner::Waiting(output); + let io_future = PendingMorselPlanner::new( + async move { Ok(Box::new(Self(future.await?)) as Box) } + .boxed(), + ); MorselPlan::new().with_io_future(io_future) } } impl MorselPlanner for ParquetMorselPlanner { - fn plan(&mut self) -> Result> { + fn plan(self: Box) -> Result> { + let mut current_state = self.0; + loop { - let planner = mem::replace(self, ParquetMorselPlanner::Planning); - let state = match planner { - ParquetMorselPlanner::Ready(state) => state, - ParquetMorselPlanner::Waiting(output) => { - output - .try_recv() - .map_err(|e| { - // IO wasn't done - *self = ParquetMorselPlanner::Errored; - match e { - TryRecvError::Empty => internal_datafusion_err!( - "planner polled before I/O completed" - ), - TryRecvError::Disconnected => internal_datafusion_err!( - "planner polled after I/O disconnected" - ), - } - })? - .inspect_err(|_| { - // IO completed successfully, but the IO was an error - *self = ParquetMorselPlanner::Errored; - })? - } - ParquetMorselPlanner::Planning => { - return internal_err!( - "ParquetMorselPlanner::plan was re-entered before previous plan completed" - ); - } - ParquetMorselPlanner::Errored => { - return internal_err!( - "ParquetMorselPlanner::plan called after a previous error" - ); - } - }; - // check for end of stream - if let ParquetOpenState::Done = state { - *self = ParquetMorselPlanner::Ready(ParquetOpenState::Done); + if let ParquetOpenState::Done = current_state { return Ok(None); - }; + } - let state = state.transition().inspect_err(|_| { - *self = ParquetMorselPlanner::Errored; - })?; + let state = current_state.transition()?; match state { #[cfg(feature = "parquet_encryption")] ParquetOpenState::LoadEncryption(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PruneFile(future.await?)) }))); } ParquetOpenState::LoadMetadata(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?))) }))); } ParquetOpenState::LoadPageIndex(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PruneWithStatistics(Box::new( future.await?, ))) }))); } ParquetOpenState::LoadBloomFilters(future) => { - return Ok(Some(self.schedule_io(async move { + return Ok(Some(Self::schedule_io(async move { Ok(ParquetOpenState::PruneWithBloomFilters(Box::new( future.await?, ))) @@ -670,7 +629,7 @@ impl MorselPlanner for ParquetMorselPlanner { } ParquetOpenState::Done => return Ok(None), cpu_state => { - *self = ParquetMorselPlanner::Ready(cpu_state); + current_state = cpu_state; } } } diff --git a/datafusion/datasource/src/morsel/mod.rs b/datafusion/datasource/src/morsel/mod.rs index 4150bc12b4b2..3939a6ba4dc8 100644 --- a/datafusion/datasource/src/morsel/mod.rs +++ b/datafusion/datasource/src/morsel/mod.rs @@ -28,6 +28,7 @@ use std::fmt::Debug; use crate::PartitionedFile; use arrow::array::RecordBatch; use datafusion_common::Result; +use futures::Future; use futures::future::BoxFuture; use futures::stream::BoxStream; @@ -81,9 +82,12 @@ pub trait MorselPlanner: Send + Debug { /// parquet metadata and evaluating pruning predicates. /// /// It should NOT do any I/O work, such as reading from the file. If I/O is - /// required, the returned [`MorselPlan`] should contain a future that the - /// caller polls to drive the I/O work to completion. Once the future is - /// complete, the caller can call `plan` again to get the next morsels. + /// required, the returned [`MorselPlan`] should contain a future that owns + /// the blocked continuation and resolves to the next [`MorselPlanner`]. + /// + /// Taking ownership of `self` encodes that contract in the type system: + /// once `plan` returns an `io_future`, there is no planner value left that + /// a caller could accidentally poll again before the I/O completes. /// /// Note this function is **not async** to make it explicitly clear that if /// I/O is required, it should be done in the returned `io_future`. @@ -98,7 +102,44 @@ pub trait MorselPlanner: Send + Debug { /// # Output Ordering /// /// See the comments on [`MorselPlan`] for the logical output order. - fn plan(&mut self) -> Result>; + fn plan(self: Box) -> Result>; +} + +/// A named future that owns the blocked continuation of a [`MorselPlanner`]. +/// +/// This is not just "some I/O future". It is the suspended remainder of the +/// planner state machine: once the required I/O completes, polling this future +/// yields the next CPU-ready planner. +/// +/// This avoids the previous runtime protocol of "planner is waiting, so don't +/// call `plan` again yet": the blocked continuation has moved into this future. +pub struct PendingMorselPlanner { + future: BoxFuture<'static, Result>>, +} + +impl PendingMorselPlanner { + /// Create a new blocked continuation future. + pub fn new(future: BoxFuture<'static, Result>>) -> Self { + Self { future } + } +} + +impl Debug for PendingMorselPlanner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PendingMorselPlanner") + .finish_non_exhaustive() + } +} + +impl Future for PendingMorselPlanner { + type Output = Result>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + self.future.as_mut().poll(cx) + } } /// Return result of [`MorselPlanner::plan`]. @@ -115,12 +156,13 @@ pub struct MorselPlan { morsels: Vec>, /// Any newly-created planners that are ready for CPU work. planners: Vec>, - /// A future that will drive any I/O work to completion. + /// A future that will drive any I/O work to completion and yield the next + /// CPU-ready planner. /// /// DataFusion will poll this future occasionally to drive the I/O work to - /// completion. Once the future resolves, DataFusion will call `plan` again - /// to get the next morsels. - io_future: Option>>, + /// completion. Once the future resolves, the returned planner is ready for + /// another call to `plan`. + io_future: Option, } impl MorselPlan { @@ -142,7 +184,7 @@ impl MorselPlan { } /// Set the pending I/O future. - pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self { + pub fn with_io_future(mut self, io_future: PendingMorselPlanner) -> Self { self.io_future = Some(io_future); self } @@ -158,12 +200,12 @@ impl MorselPlan { } /// Take the pending I/O future, if any. - pub fn take_io_future(&mut self) -> Option>> { + pub fn take_io_future(&mut self) -> Option { self.io_future.take() } /// Set the pending I/O future. - pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) { + pub fn set_io_future(&mut self, io_future: PendingMorselPlanner) { self.io_future = Some(io_future); }