Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 65 additions & 106 deletions datafusion/datasource-parquet/src/opener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ use crate::{
};
use arrow::array::{RecordBatch, RecordBatchOptions};
use arrow::datatypes::DataType;
use datafusion_common::internal_datafusion_err;
use datafusion_common::internal_err;
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
use datafusion_datasource::morsel::{Morsel, MorselPlan, MorselPlanner, Morselizer};
use datafusion_datasource::morsel::{
Morsel, MorselPlan, MorselPlanner, Morselizer, PendingMorselPlanner,
};
use datafusion_physical_expr::projection::{ProjectionExprs, Projector};
use datafusion_physical_expr::utils::reassign_expr_columns;
use datafusion_physical_expr_adapter::replace_columns_with_literals;
Expand All @@ -39,8 +40,6 @@ use std::future::Future;
use std::mem;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, TryRecvError};
use std::task::{Context, Poll};

use arrow::datatypes::{Schema, SchemaRef, TimeUnit};
Expand Down Expand Up @@ -430,8 +429,8 @@ impl ParquetOpenState {
///
/// Implements state machine described in [`ParquetOpenState`]
struct ParquetOpenFuture {
planner: Box<dyn MorselPlanner>,
pending_io: Option<BoxFuture<'static, Result<()>>>,
ready_planners: VecDeque<Box<dyn MorselPlanner>>,
pending_io: VecDeque<PendingMorselPlanner>,
ready_morsels: VecDeque<Box<dyn Morsel>>,
}

Expand All @@ -441,8 +440,8 @@ impl ParquetOpenFuture {
partitioned_file: PartitionedFile,
) -> Result<Self> {
Ok(Self {
planner: morselizer.plan_file(partitioned_file)?,
pending_io: None,
ready_planners: vec![morselizer.plan_file(partitioned_file)?].into(),
pending_io: VecDeque::new(),
ready_morsels: VecDeque::new(),
})
}
Expand All @@ -453,19 +452,37 @@ impl Future for ParquetOpenFuture {

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
// If waiting on IO, poll
if let Some(io_future) = self.pending_io.as_mut() {
ready!(io_future.poll_unpin(cx))?;
self.pending_io = None;
// If waiting on IO, poll the oldest blocked continuation until it
// yields the next CPU-ready planner.
if let Some(mut io_future) = self.pending_io.pop_front() {
match io_future.poll_unpin(cx) {
Poll::Pending => {
self.pending_io.push_front(io_future);
}
Poll::Ready(Ok(planner)) => {
self.ready_planners.push_back(planner);
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(err)),
}
}

// have a morsel ready to go, return that
if let Some(morsel) = self.ready_morsels.pop_front() {
return Poll::Ready(Ok(morsel.into_stream()));
}

// If all remaining work is blocked on I/O, wait for it to yield the
// next planner rather than incorrectly reporting an empty stream.
if self.ready_planners.is_empty() && !self.pending_io.is_empty() {
return Poll::Pending;
}

// Planner did not produce any stream (for example, it pruned the entire file)
let Some(mut plan) = self.planner.plan()? else {
let Some(planner) = self.ready_planners.pop_front() else {
return Poll::Ready(Ok(futures::stream::empty().boxed()));
};

let Some(mut plan) = planner.plan()? else {
return Poll::Ready(Ok(futures::stream::empty().boxed()));
};

Expand All @@ -477,9 +494,17 @@ impl Future for ParquetOpenFuture {
}

self.ready_morsels = plan.take_morsels().into();

if let Some(io_future) = plan.take_io_future() {
self.pending_io = Some(io_future);
self.pending_io.push_back(io_future);
}

if self.ready_morsels.is_empty()
&& self.ready_planners.is_empty()
&& self.pending_io.is_empty()
{
return Poll::Ready(internal_err!(
"planner returned an empty morsel plan"
));
}
}
}
Expand Down Expand Up @@ -510,38 +535,13 @@ impl Morsel for ParquetStreamMorsel {
}

/// Stateful planner for opening a single parquet file via the morsel APIs.
enum ParquetMorselPlanner {
/// Ready to perform CPU-only planning work.
Ready(ParquetOpenState),
/// Waiting for an I/O future to produce the next planner state.
///
/// Callers must not call [`MorselPlanner::plan`] again until the
/// corresponding I/O future has completed and its result is ready to
/// receive from the channel.
///
/// Doing so is a protocol violation and transitions the planner to
/// [`ParquetMorselPlanner::Errored`].
Waiting(Receiver<Result<ParquetOpenState>>),
/// Actively planning (this state should be replaced by end of the call to plan)
Planning,
/// An earlier planning attempt returned an error.
Errored,
}
struct ParquetMorselPlanner(ParquetOpenState);

impl fmt::Debug for ParquetMorselPlanner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Ready(state) => f
.debug_tuple("ParquetMorselPlanner::Ready")
.field(state)
.finish(),
Self::Waiting(_) => f
.debug_tuple("ParquetMorselPlanner::Waiting")
.field(&"<pending io>")
.finish(),
Self::Planning => f.debug_tuple("ParquetMorselPlanner::Planning").finish(),
Self::Errored => f.debug_tuple("ParquetMorselPlanner::Errored").finish(),
}
f.debug_tuple("ParquetMorselPlanner")
.field(&self.0)
.finish()
}
}

Expand All @@ -557,107 +557,66 @@ impl ParquetMorselPlanner {
let state = ParquetOpenState::Start {
prepared: Box::new(prepared),
};
Ok(Self::Ready(state))
Ok(Self(state))
}

/// Schedule an I/O future that resolves to the planner's next owned state.
///
/// This helper
///
/// 1. creates a channel to send the next [`ParquetOpenState`] back to the
/// planner once the I/O future completes,
/// 1. packages the next [`ParquetOpenState`] into a future that yields the
/// next CPU-ready planner once the I/O future completes,
///
/// 2. transitions the planner into [`ParquetMorselPlanner::Waiting`]
/// 2. moves the blocked continuation into that future rather than keeping a
/// `Waiting` planner state around,
///
/// 3. returns a [`MorselPlan`] containing the boxed I/O future for the
/// caller to poll.
///
fn schedule_io<F>(&mut self, future: F) -> MorselPlan
fn schedule_io<F>(future: F) -> MorselPlan
where
F: Future<Output = Result<ParquetOpenState>> + Send + 'static,
{
let (output_for_future, output) = mpsc::channel();
let io_future = async move {
let next_state = future.await?;
output_for_future.send(Ok(next_state)).map_err(|e| {
DataFusionError::Execution(format!("failed to send planner output: {e}"))
})?;
Ok(())
}
.boxed();
*self = ParquetMorselPlanner::Waiting(output);
let io_future = PendingMorselPlanner::new(
async move { Ok(Box::new(Self(future.await?)) as Box<dyn MorselPlanner>) }
.boxed(),
);
MorselPlan::new().with_io_future(io_future)
}
}

impl MorselPlanner for ParquetMorselPlanner {
fn plan(&mut self) -> Result<Option<MorselPlan>> {
fn plan(self: Box<Self>) -> Result<Option<MorselPlan>> {
let mut current_state = self.0;

loop {
let planner = mem::replace(self, ParquetMorselPlanner::Planning);
let state = match planner {
ParquetMorselPlanner::Ready(state) => state,
ParquetMorselPlanner::Waiting(output) => {
output
.try_recv()
.map_err(|e| {
// IO wasn't done
*self = ParquetMorselPlanner::Errored;
match e {
TryRecvError::Empty => internal_datafusion_err!(
"planner polled before I/O completed"
),
TryRecvError::Disconnected => internal_datafusion_err!(
"planner polled after I/O disconnected"
),
}
})?
.inspect_err(|_| {
// IO completed successfully, but the IO was an error
*self = ParquetMorselPlanner::Errored;
})?
}
ParquetMorselPlanner::Planning => {
return internal_err!(
"ParquetMorselPlanner::plan was re-entered before previous plan completed"
);
}
ParquetMorselPlanner::Errored => {
return internal_err!(
"ParquetMorselPlanner::plan called after a previous error"
);
}
};
// check for end of stream
if let ParquetOpenState::Done = state {
*self = ParquetMorselPlanner::Ready(ParquetOpenState::Done);
if let ParquetOpenState::Done = current_state {
return Ok(None);
};
}

let state = state.transition().inspect_err(|_| {
*self = ParquetMorselPlanner::Errored;
})?;
let state = current_state.transition()?;

match state {
#[cfg(feature = "parquet_encryption")]
ParquetOpenState::LoadEncryption(future) => {
return Ok(Some(self.schedule_io(async move {
return Ok(Some(Self::schedule_io(async move {
Ok(ParquetOpenState::PruneFile(future.await?))
})));
}
ParquetOpenState::LoadMetadata(future) => {
return Ok(Some(self.schedule_io(async move {
return Ok(Some(Self::schedule_io(async move {
Ok(ParquetOpenState::PrepareFilters(Box::new(future.await?)))
})));
}
ParquetOpenState::LoadPageIndex(future) => {
return Ok(Some(self.schedule_io(async move {
return Ok(Some(Self::schedule_io(async move {
Ok(ParquetOpenState::PruneWithStatistics(Box::new(
future.await?,
)))
})));
}
ParquetOpenState::LoadBloomFilters(future) => {
return Ok(Some(self.schedule_io(async move {
return Ok(Some(Self::schedule_io(async move {
Ok(ParquetOpenState::PruneWithBloomFilters(Box::new(
future.await?,
)))
Expand All @@ -670,7 +629,7 @@ impl MorselPlanner for ParquetMorselPlanner {
}
ParquetOpenState::Done => return Ok(None),
cpu_state => {
*self = ParquetMorselPlanner::Ready(cpu_state);
current_state = cpu_state;
}
}
}
Expand Down
64 changes: 53 additions & 11 deletions datafusion/datasource/src/morsel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::fmt::Debug;
use crate::PartitionedFile;
use arrow::array::RecordBatch;
use datafusion_common::Result;
use futures::Future;
use futures::future::BoxFuture;
use futures::stream::BoxStream;

Expand Down Expand Up @@ -81,9 +82,12 @@ pub trait MorselPlanner: Send + Debug {
/// parquet metadata and evaluating pruning predicates.
///
/// It should NOT do any I/O work, such as reading from the file. If I/O is
/// required, the returned [`MorselPlan`] should contain a future that the
/// caller polls to drive the I/O work to completion. Once the future is
/// complete, the caller can call `plan` again to get the next morsels.
/// required, the returned [`MorselPlan`] should contain a future that owns
/// the blocked continuation and resolves to the next [`MorselPlanner`].
///
/// Taking ownership of `self` encodes that contract in the type system:
/// once `plan` returns an `io_future`, there is no planner value left that
/// a caller could accidentally poll again before the I/O completes.
///
/// Note this function is **not async** to make it explicitly clear that if
/// I/O is required, it should be done in the returned `io_future`.
Expand All @@ -98,7 +102,44 @@ pub trait MorselPlanner: Send + Debug {
/// # Output Ordering
///
/// See the comments on [`MorselPlan`] for the logical output order.
fn plan(&mut self) -> Result<Option<MorselPlan>>;
fn plan(self: Box<Self>) -> Result<Option<MorselPlan>>;
}

/// A named future that owns the blocked continuation of a [`MorselPlanner`].
///
/// This is not just "some I/O future". It is the suspended remainder of the
/// planner state machine: once the required I/O completes, polling this future
/// yields the next CPU-ready planner.
///
/// This avoids the previous runtime protocol of "planner is waiting, so don't
/// call `plan` again yet": the blocked continuation has moved into this future.
pub struct PendingMorselPlanner {
future: BoxFuture<'static, Result<Box<dyn MorselPlanner>>>,
}

impl PendingMorselPlanner {
/// Create a new blocked continuation future.
pub fn new(future: BoxFuture<'static, Result<Box<dyn MorselPlanner>>>) -> Self {
Self { future }
}
}

impl Debug for PendingMorselPlanner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PendingMorselPlanner")
.finish_non_exhaustive()
}
}

impl Future for PendingMorselPlanner {
type Output = Result<Box<dyn MorselPlanner>>;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.future.as_mut().poll(cx)
}
}

/// Return result of [`MorselPlanner::plan`].
Expand All @@ -115,12 +156,13 @@ pub struct MorselPlan {
morsels: Vec<Box<dyn Morsel>>,
/// Any newly-created planners that are ready for CPU work.
planners: Vec<Box<dyn MorselPlanner>>,
/// A future that will drive any I/O work to completion.
/// A future that will drive any I/O work to completion and yield the next
/// CPU-ready planner.
///
/// DataFusion will poll this future occasionally to drive the I/O work to
/// completion. Once the future resolves, DataFusion will call `plan` again
/// to get the next morsels.
io_future: Option<BoxFuture<'static, Result<()>>>,
/// completion. Once the future resolves, the returned planner is ready for
/// another call to `plan`.
io_future: Option<PendingMorselPlanner>,
}

impl MorselPlan {
Expand All @@ -142,7 +184,7 @@ impl MorselPlan {
}

/// Set the pending I/O future.
pub fn with_io_future(mut self, io_future: BoxFuture<'static, Result<()>>) -> Self {
pub fn with_io_future(mut self, io_future: PendingMorselPlanner) -> Self {
self.io_future = Some(io_future);
self
}
Expand All @@ -158,12 +200,12 @@ impl MorselPlan {
}

/// Take the pending I/O future, if any.
pub fn take_io_future(&mut self) -> Option<BoxFuture<'static, Result<()>>> {
pub fn take_io_future(&mut self) -> Option<PendingMorselPlanner> {
self.io_future.take()
}

/// Set the pending I/O future.
pub fn set_io_future(&mut self, io_future: BoxFuture<'static, Result<()>>) {
pub fn set_io_future(&mut self, io_future: PendingMorselPlanner) {
self.io_future = Some(io_future);
}

Expand Down