Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 3 additions & 53 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

//! Defines common code used in execution plans

use super::{RecordBatchStream, SendableRecordBatchStream};
use super::SendableRecordBatchStream;
use crate::error::{DataFusionError, Result};
use crate::execution::context::TaskContext;
use crate::execution::memory_pool::MemoryReservation;
use crate::physical_plan::metrics::MemTrackingMetrics;
use crate::physical_plan::{displayable, ColumnStatistics, ExecutionPlan, Statistics};
use arrow::datatypes::{Schema, SchemaRef};
use arrow::datatypes::Schema;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::PhysicalSortExpr;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use futures::{Future, StreamExt, TryStreamExt};
use log::debug;
use parking_lot::Mutex;
use pin_project_lite::pin_project;
Expand All @@ -42,55 +41,6 @@ use tokio::task::JoinHandle;
/// [`MemoryReservation`] used across query execution streams
pub(crate) type SharedMemoryReservation = Arc<Mutex<MemoryReservation>>;

/// Stream of record batches
pub struct SizedRecordBatchStream {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
index: usize,
metrics: MemTrackingMetrics,
}

impl SizedRecordBatchStream {
/// Create a new RecordBatchIterator
pub fn new(
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
mut metrics: MemTrackingMetrics,
) -> Self {
let size = batches.iter().map(|b| batch_byte_size(b)).sum::<usize>();
metrics.init_mem_used(size);
SizedRecordBatchStream {
schema,
index: 0,
batches,
metrics,
}
}
}

impl Stream for SizedRecordBatchStream {
type Item = Result<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let poll = Poll::Ready(if self.index < self.batches.len() {
self.index += 1;
Some(Ok(self.batches[self.index - 1].as_ref().clone()))
} else {
None
});
self.metrics.record_poll(poll)
}
}

impl RecordBatchStream for SizedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

/// Create a vector of record batches from a stream
pub async fn collect(stream: SendableRecordBatchStream) -> Result<Vec<RecordBatch>> {
stream.try_collect::<Vec<_>>().await
Expand Down
18 changes: 5 additions & 13 deletions datafusion/core/src/physical_plan/explain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,14 @@ use std::sync::Arc;
use crate::{
error::{DataFusionError, Result},
logical_expr::StringifiedPlan,
physical_plan::{
common::SizedRecordBatchStream, DisplayFormatType, ExecutionPlan, Partitioning,
Statistics,
},
physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning, Statistics},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use log::trace;

use super::{expressions::PhysicalSortExpr, SendableRecordBatchStream};
use crate::execution::context::TaskContext;
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use crate::physical_plan::stream::RecordBatchStreamAdapter;

/// Explain execution plan operator. This operator contains the string
/// values of the various plans it has when it is created, and passes
Expand Down Expand Up @@ -150,17 +147,12 @@ impl ExecutionPlan for ExplainExec {
],
)?;

let metrics = ExecutionPlanMetricsSet::new();
let tracking_metrics =
MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);

trace!(
"Before returning SizedRecordBatch in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());
"Before returning RecordBatchStream in ExplainExec::execute for partition {} of context session_id {} and task_id {:?}", partition, context.session_id(), context.task_id());

Ok(Box::pin(SizedRecordBatchStream::new(
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema.clone(),
vec![Arc::new(record_batch)],
tracking_metrics,
futures::stream::iter(vec![Ok(record_batch)]),
)))
}

Expand Down
21 changes: 8 additions & 13 deletions datafusion/core/tests/provider_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ use datafusion::datasource::datasource::{TableProvider, TableType};
use datafusion::error::Result;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
use datafusion::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};
Expand Down Expand Up @@ -56,7 +55,7 @@ fn create_batch(value: i32, num_rows: usize) -> Result<RecordBatch> {
#[derive(Debug)]
struct CustomPlan {
schema: SchemaRef,
batches: Vec<Arc<RecordBatch>>,
batches: Vec<RecordBatch>,
}

impl ExecutionPlan for CustomPlan {
Expand Down Expand Up @@ -89,16 +88,12 @@ impl ExecutionPlan for CustomPlan {

fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let metrics = ExecutionPlanMetricsSet::new();
let tracking_metrics =
MemTrackingMetrics::new(&metrics, context.memory_pool(), partition);
Ok(Box::pin(SizedRecordBatchStream::new(
Ok(Box::pin(RecordBatchStreamAdapter::new(
self.schema(),
self.batches.clone(),
tracking_metrics,
futures::stream::iter(self.batches.clone().into_iter().map(Ok)),
)))
}

Expand Down Expand Up @@ -183,8 +178,8 @@ impl TableProvider for CustomProvider {
Ok(Arc::new(CustomPlan {
schema: self.zero_batch.schema(),
batches: match int_value {
0 => vec![Arc::new(self.zero_batch.clone())],
1 => vec![Arc::new(self.one_batch.clone())],
0 => vec![self.zero_batch.clone()],
1 => vec![self.one_batch.clone()],
_ => vec![],
},
}))
Expand Down