From bf58d3ef55d12100f77f00872d3bee477fb1caff Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 1 Nov 2023 18:01:32 +0000 Subject: [PATCH 01/13] Implement FIFO using extension points (#7994) --- datafusion/core/tests/fifo.rs | 416 +++++++++++++++++++++------------- 1 file changed, 254 insertions(+), 162 deletions(-) diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 7d9ea97f7b5b..bc5b53c68ad0 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -17,32 +17,208 @@ //! This test demonstrates the DataFusion FIFO capabilities. //! -#[cfg(not(target_os = "windows"))] +#[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { use arrow::array::Array; - use arrow::csv::ReaderBuilder; + use arrow::csv::{ReaderBuilder, WriterBuilder}; use arrow::datatypes::{DataType, Field, Schema}; - use datafusion::test_util::register_unbounded_file_with_ordering; + use arrow_array::RecordBatch; + use arrow_schema::{SchemaRef, SortOptions}; + use async_trait::async_trait; + use datafusion::datasource::provider::TableProviderFactory; + use datafusion::datasource::TableProvider; + use datafusion::execution::context::SessionState; use datafusion::{ + physical_plan, prelude::{CsvReadOptions, SessionConfig, SessionContext}, test_util::{aggr_test_schema, arrow_test_data}, }; use datafusion_common::{exec_err, DataFusionError, Result}; + use datafusion_execution::runtime_env::RuntimeEnv; + use datafusion_execution::{SendableRecordBatchStream, TaskContext}; + use datafusion_expr::{CreateExternalTable, Expr, TableType}; + use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_physical_plan::common::AbortOnDropSingle; + use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; + use datafusion_physical_plan::metrics::MetricsSet; + use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; + use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; + use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use futures::StreamExt; - use itertools::enumerate; use nix::sys::stat; use nix::unistd; - use rstest::*; + use std::any::Any; + use std::collections::HashMap; + use std::fmt::Formatter; use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::PathBuf; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; - use std::thread; - use std::thread::JoinHandle; - use std::time::{Duration, Instant}; use tempfile::TempDir; + use tokio::task::{spawn_blocking, JoinHandle}; + + #[derive(Default)] + struct FifoFactory {} + + #[async_trait] + impl TableProviderFactory for FifoFactory { + async fn create( + &self, + _state: &SessionState, + cmd: &CreateExternalTable, + ) -> Result> { + let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); + let location = cmd.location.clone(); + Ok(fifo_table(schema, location, None)) + } + } + + #[derive(Debug)] + struct FifoConfig { + schema: SchemaRef, + location: PathBuf, + sort: Option, + } + + struct FifoTable(Arc); + + #[async_trait] + impl TableProvider for FifoTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.0.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(StreamingTableExec::try_new( + self.0.schema.clone(), + vec![Arc::new(FifoRead(self.0.clone())) as _], + projection, + self.0.sort.clone(), + true, + )?)) + } + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + _overwrite: bool, + ) -> Result> { + let ordering = match &self.0.sort { + Some(order) => Some(order.iter().map(|e| e.clone().into()).collect()), + None => None, + }; + + Ok(Arc::new(FileSinkExec::new( + input, + Arc::new(FifoWrite(self.0.clone())), + self.0.schema.clone(), + ordering, + ))) + } + } + + struct FifoRead(Arc); + + impl PartitionStream for FifoRead { + fn schema(&self) -> &SchemaRef { + &self.0.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let config = self.0.clone(); + let schema = self.0.schema.clone(); + let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); + let tx = builder.tx(); + builder.spawn_blocking(move || { + let file = File::open(&config.location)?; + let reader = ReaderBuilder::new(config.schema.clone()).build(file)?; + for b in reader { + if tx.blocking_send(b.map_err(Into::into)).is_err() { + break; + } + } + Ok(()) + }); + builder.build() + } + } + + #[derive(Debug)] + struct FifoWrite(Arc); + + impl DisplayAs for FifoWrite { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{self:?}") + } + } + + #[async_trait] + impl DataSink for FifoWrite { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { + let config = self.0.clone(); + let (sender, mut receiver) = tokio::sync::mpsc::channel::(2); + // Note: FIFO Files support poll so this could use AsyncFd + let write = AbortOnDropSingle::new(spawn_blocking(move || { + let file = OpenOptions::new().write(true).open(&config.location)?; + let mut count = 0_u64; + let mut writer = WriterBuilder::new().with_header(false).build(file); + while let Some(batch) = receiver.blocking_recv() { + count += batch.num_rows() as u64; + writer.write(&batch)?; + } + Ok(count) + })); + + while let Some(b) = data.next().await.transpose()? { + if sender.send(b).await.is_err() { + break; + } + } + drop(sender); + write.await.unwrap() + } + } + + /// Makes a TableProvider for a fifo file + fn fifo_table( + schema: SchemaRef, + path: impl Into, + sort: Option, + ) -> Arc { + Arc::new(FifoTable(Arc::new(FifoConfig { + schema, + sort, + location: path.into(), + }))) + } // ! For the sake of the test, do not alter the numbers. ! // Session batch size @@ -64,48 +240,20 @@ mod unix_test { } } - fn write_to_fifo( - mut file: &File, - line: &str, - ref_time: Instant, - broken_pipe_timeout: Duration, - ) -> Result<()> { - // We need to handle broken pipe error until the reader is ready. This - // is why we use a timeout to limit the wait duration for the reader. - // If the error is different than broken pipe, we fail immediately. - while let Err(e) = file.write_all(line.as_bytes()) { - if e.raw_os_error().unwrap() == 32 { - let interval = Instant::now().duration_since(ref_time); - if interval < broken_pipe_timeout { - thread::sleep(Duration::from_millis(100)); - continue; - } - } - return exec_err!("{}", e); - } - Ok(()) - } - // This test provides a relatively realistic end-to-end scenario where // we swap join sides to accommodate a FIFO source. - #[rstest] - #[timeout(std::time::Duration::from_secs(30))] - #[tokio::test(flavor = "multi_thread", worker_threads = 8)] - async fn unbounded_file_with_swapped_join( - #[values(true, false)] unbounded_file: bool, - ) -> Result<()> { + #[tokio::test] + async fn unbounded_file_with_swapped_join() -> Result<()> { // Create session context let config = SessionConfig::new() .with_batch_size(TEST_BATCH_SIZE) .with_collect_statistics(false) .with_target_partitions(1); + let ctx = SessionContext::new_with_config(config); - // To make unbounded deterministic - let waiting = Arc::new(AtomicBool::new(unbounded_file)); // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; - let fifo_path = - create_fifo_file(&tmp_dir, &format!("fifo_{unbounded_file:?}.csv"))?; + let fifo_path = create_fifo_file(&tmp_dir, "fifo_file.csv")?; // Execution can calculated at least one RecordBatch after the number of // "joinable_lines_length" lines are read. let joinable_lines_length = @@ -124,28 +272,18 @@ mod unix_test { .map(|(a1, a2)| format!("{a1},{a2}\n")) .collect::>(); // Create writing threads for the left and right FIFO files - let task = create_writing_thread( - fifo_path.clone(), - "a1,a2\n".to_owned(), - lines, - waiting.clone(), - joinable_lines_length, - ); + let task = create_writing_thread(fifo_path.clone(), lines); // Data Schema let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::Utf8, false), Field::new("a2", DataType::UInt32, false), ])); + // Create a file with bounded or unbounded flag. - ctx.register_csv( - "left", - fifo_path.as_os_str().to_str().unwrap(), - CsvReadOptions::new() - .schema(schema.as_ref()) - .mark_infinite(unbounded_file), - ) - .await?; + let provider = fifo_table(schema, fifo_path, None); + ctx.register_table("left", provider).unwrap(); + // Register right table let schema = aggr_test_schema(); let test_data = arrow_test_data(); @@ -158,10 +296,8 @@ mod unix_test { // Execute the query let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?; let mut stream = df.execute_stream().await?; - while (stream.next().await).is_some() { - waiting.store(false, Ordering::SeqCst); - } - task.join().unwrap(); + while (stream.next().await).is_some() {} + task.await.unwrap(); Ok(()) } @@ -172,39 +308,20 @@ mod unix_test { Equal, } - fn create_writing_thread( - file_path: PathBuf, - header: String, - lines: Vec, - waiting_lock: Arc, - wait_until: usize, - ) -> JoinHandle<()> { - // Timeout for a long period of BrokenPipe error - let broken_pipe_timeout = Duration::from_secs(10); - // Spawn a new thread to write to the FIFO file - thread::spawn(move || { - let file = OpenOptions::new().write(true).open(file_path).unwrap(); - // Reference time to use when deciding to fail the test - let execution_start = Instant::now(); - write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap(); - for (cnt, line) in enumerate(lines) { - while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { - thread::sleep(Duration::from_millis(50)); - } - write_to_fifo(&file, &line, execution_start, broken_pipe_timeout) - .unwrap(); + fn create_writing_thread(file_path: PathBuf, lines: Vec) -> JoinHandle<()> { + spawn_blocking(move || { + let mut file = OpenOptions::new().write(true).open(file_path).unwrap(); + for line in &lines { + file.write_all(line.as_bytes()).unwrap() } - drop(file); + file.flush().unwrap(); }) } // This test provides a relatively realistic end-to-end scenario where // we change the join into a [SymmetricHashJoin] to accommodate two // unbounded (FIFO) sources. - #[rstest] - #[timeout(std::time::Duration::from_secs(30))] - #[tokio::test(flavor = "multi_thread")] - #[ignore] + #[tokio::test] async fn unbounded_file_with_symmetric_join() -> Result<()> { // Create session context let config = SessionConfig::new() @@ -212,8 +329,6 @@ mod unix_test { .set_bool("datafusion.execution.coalesce_batches", false) .with_target_partitions(1); let ctx = SessionContext::new_with_config(config); - // Tasks - let mut tasks: Vec> = vec![]; // Join filter let a1_iter = 0..TEST_DATA_SIZE; @@ -230,78 +345,54 @@ mod unix_test { let left_fifo = create_fifo_file(&tmp_dir, "left.csv")?; // Create a FIFO file for the right input source. let right_fifo = create_fifo_file(&tmp_dir, "right.csv")?; - // Create a mutex for tracking if the right input source is waiting for data. - let waiting = Arc::new(AtomicBool::new(true)); // Create writing threads for the left and right FIFO files - tasks.push(create_writing_thread( - left_fifo.clone(), - "a1,a2\n".to_owned(), - lines.clone(), - waiting.clone(), - TEST_BATCH_SIZE, - )); - tasks.push(create_writing_thread( - right_fifo.clone(), - "a1,a2\n".to_owned(), - lines.clone(), - waiting.clone(), - TEST_BATCH_SIZE, - )); + let tasks = vec![ + create_writing_thread(left_fifo.clone(), lines.clone()), + create_writing_thread(right_fifo.clone(), lines.clone()), + ]; // Create schema let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::UInt32, false), Field::new("a2", DataType::UInt32, false), ])); + // Specify the ordering: - let file_sort_order = vec![[datafusion_expr::col("a1")] - .into_iter() - .map(|e| { - let ascending = true; - let nulls_first = false; - e.sort(ascending, nulls_first) - }) - .collect::>()]; + let order = Some(vec![PhysicalSortExpr { + expr: physical_plan::expressions::col("a1", schema.as_ref())?, + options: SortOptions { + descending: false, + nulls_first: false, + }, + }]); + // Set unbounded sorted files read configuration - register_unbounded_file_with_ordering( - &ctx, - schema.clone(), - &left_fifo, - "left", - file_sort_order.clone(), - true, - ) - .await?; - register_unbounded_file_with_ordering( - &ctx, - schema, - &right_fifo, - "right", - file_sort_order, - true, - ) - .await?; + let provider = fifo_table(schema.clone(), left_fifo, order.clone()); + ctx.register_table("left", provider)?; + + let provider = fifo_table(schema.clone(), right_fifo, order); + ctx.register_table("right", provider)?; + // Execute the query, with no matching rows. (since key is modulus 10) let df = ctx .sql( "SELECT - t1.a1, - t1.a2, - t2.a1, - t2.a2 - FROM - left as t1 FULL - JOIN right as t2 ON t1.a2 = t2.a2 - AND t1.a1 > t2.a1 + 4 - AND t1.a1 < t2.a1 + 9", + t1.a1, + t1.a2, + t2.a1, + t2.a2 + FROM + left as t1 FULL + JOIN right as t2 ON t1.a2 = t2.a2 + AND t1.a1 > t2.a1 + 4 + AND t1.a1 < t2.a1 + 9", ) .await?; let mut stream = df.execute_stream().await?; let mut operations = vec![]; // Partial. while let Some(Ok(batch)) = stream.next().await { - waiting.store(false, Ordering::SeqCst); let left_unmatched = batch.column(2).null_count(); let right_unmatched = batch.column(0).null_count(); let op = if left_unmatched == 0 && right_unmatched == 0 { @@ -313,7 +404,8 @@ mod unix_test { }; operations.push(op); } - tasks.into_iter().for_each(|jh| jh.join().unwrap()); + futures::future::try_join_all(tasks).await.unwrap(); + // The SymmetricHashJoin executor produces FULL join results at every // pruning, which happens before it reaches the end of input and more // than once. In this test, we feed partially joinable data to both @@ -337,12 +429,15 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] async fn test_sql_insert_into_fifo() -> Result<()> { - // To make unbounded deterministic - let waiting = Arc::new(AtomicBool::new(true)); - let waiting_thread = waiting.clone(); // create local execution context + let runtime = Arc::new(RuntimeEnv::default()); let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE); - let ctx = SessionContext::new_with_config(config); + let mut state = SessionState::new_with_config_rt(config, runtime); + let mut factories = HashMap::with_capacity(1); + factories.insert("CSV".to_string(), Arc::new(FifoFactory::default()) as _); + *state.table_factories_mut() = factories; + let ctx = SessionContext::new_with_state(state); + // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; let source_fifo_path = create_fifo_file(&tmp_dir, "source.csv")?; @@ -356,20 +451,18 @@ mod unix_test { // thread. This approach ensures that the pipeline remains unbroken. tasks.push(create_writing_thread( source_fifo_path_thread, - "a1,a2\n".to_owned(), (0..TEST_DATA_SIZE) .map(|_| "a,1\n".to_string()) .collect::>(), - waiting, - TEST_BATCH_SIZE, )); // Create a new temporary FIFO file let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?; // Prevent move let (sink_fifo_path_thread, sink_display_fifo_path) = (sink_fifo_path.clone(), sink_fifo_path.display()); + // Spawn a new thread to read sink EXTERNAL TABLE. - tasks.push(thread::spawn(move || { + tasks.push(spawn_blocking(move || { let file = File::open(sink_fifo_path_thread).unwrap(); let schema = Arc::new(Schema::new(vec![ Field::new("a1", DataType::Utf8, false), @@ -377,14 +470,18 @@ mod unix_test { ])); let mut reader = ReaderBuilder::new(schema) - .with_header(true) .with_batch_size(TEST_BATCH_SIZE) .build(file) .map_err(|e| DataFusionError::Internal(e.to_string())) .unwrap(); - while let Some(Ok(_)) = reader.next() { - waiting_thread.store(false, Ordering::SeqCst); + let mut remaining = TEST_DATA_SIZE; + + while let Some(Ok(b)) = reader.next() { + remaining = remaining.checked_sub(b.num_rows()).unwrap(); + if remaining == 0 { + break; + } } })); // register second csv file with the SQL (create an empty file if not found) @@ -394,8 +491,6 @@ mod unix_test { a2 INT NOT NULL ) STORED AS CSV - WITH HEADER ROW - OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{source_display_fifo_path}'" )) .await?; @@ -407,20 +502,17 @@ mod unix_test { a2 INT NOT NULL ) STORED AS CSV - WITH HEADER ROW - OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{sink_display_fifo_path}'" )) .await?; let df = ctx - .sql( - "INSERT INTO sink_table - SELECT a1, a2 FROM source_table", - ) + .sql("INSERT INTO sink_table SELECT a1, a2 FROM source_table") .await?; - df.collect().await?; - tasks.into_iter().for_each(|jh| jh.join().unwrap()); + + // Start execution + let _ = df.collect().await.unwrap(); + futures::future::try_join_all(tasks).await.unwrap(); Ok(()) } } From 30b69cff9abc460b8772c1d22fdfd4dc8072a224 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 1 Nov 2023 20:45:26 +0000 Subject: [PATCH 02/13] Clippy --- datafusion/core/tests/fifo.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index bc5b53c68ad0..fefa6c5aaee0 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -119,10 +119,8 @@ mod unix_test { input: Arc, _overwrite: bool, ) -> Result> { - let ordering = match &self.0.sort { - Some(order) => Some(order.iter().map(|e| e.clone().into()).collect()), - None => None, - }; + let sort = self.0.sort.as_ref(); + let ordering = sort.map(|o| o.iter().map(|e| e.clone().into()).collect()); Ok(Arc::new(FileSinkExec::new( input, From b01fbdf58dc8a5f1a7b2f57de6957102dccb47d0 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 08:38:34 +0000 Subject: [PATCH 03/13] Rename to StreamTable and make public --- datafusion/core/src/datasource/mod.rs | 1 + datafusion/core/src/datasource/stream.rs | 221 +++++++++++++++++++++++ datafusion/core/tests/fifo.rs | 197 ++------------------ 3 files changed, 242 insertions(+), 177 deletions(-) create mode 100644 datafusion/core/src/datasource/stream.rs diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 48e9d6992124..8db09e2333e1 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -29,6 +29,7 @@ pub mod memory; pub mod physical_plan; pub mod provider; mod statistics; +pub mod stream; pub mod streaming; pub mod view; diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs new file mode 100644 index 000000000000..b96c28ed3546 --- /dev/null +++ b/datafusion/core/src/datasource/stream.rs @@ -0,0 +1,221 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TableProvider for stream sources, such as FIFO files + +use crate::datasource::provider::TableProviderFactory; +use crate::datasource::TableProvider; +use crate::execution::context::SessionState; +use arrow::csv::{ReaderBuilder, WriterBuilder}; +use arrow_array::RecordBatch; +use arrow_schema::SchemaRef; +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_execution::{SendableRecordBatchStream, TaskContext}; +use datafusion_expr::{CreateExternalTable, Expr, TableType}; +use datafusion_physical_expr::LexOrdering; +use datafusion_physical_plan::common::AbortOnDropSingle; +use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; +use datafusion_physical_plan::metrics::MetricsSet; +use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; +use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; +use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; +use futures::StreamExt; +use std::any::Any; +use std::fmt::Formatter; +use std::fs::{File, OpenOptions}; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::task::spawn_blocking; + +/// A [`TableProviderFactory`] for [`StreamTable`] +#[derive(Default)] +pub struct StreamTableFactory {} + +#[async_trait] +impl TableProviderFactory for StreamTableFactory { + async fn create( + &self, + _state: &SessionState, + cmd: &CreateExternalTable, + ) -> Result> { + let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); + let location = cmd.location.clone(); + Ok(Arc::new(StreamTable(Arc::new(StreamConfig { + schema, + sort: None, // TODO + location: location.into(), + })))) + } +} + +/// The configuration for a [`StreamTable`] +#[derive(Debug)] +pub struct StreamConfig { + schema: SchemaRef, + location: PathBuf, + sort: Option, +} + +impl StreamConfig { + /// Stream data from the file at `location` + pub fn new_file(schema: SchemaRef, location: PathBuf) -> Self { + Self { + schema, + location, + sort: None, + } + } + + /// Specify a sort order for the stream + pub fn with_sort(mut self, sort: Option) -> Self { + self.sort = sort; + self + } +} + +/// A [`TableProvider`] for a stream source, such as a FIFO file +pub struct StreamTable(Arc); + +impl StreamTable { + /// Create a new [`StreamTable`] for the given `StreamConfig` + pub fn new(config: Arc) -> Self { + Self(config) + } +} + +#[async_trait] +impl TableProvider for StreamTable { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.0.schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::Temporary + } + + async fn scan( + &self, + _state: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> Result> { + Ok(Arc::new(StreamingTableExec::try_new( + self.0.schema.clone(), + vec![Arc::new(StreamRead(self.0.clone())) as _], + projection, + self.0.sort.clone(), + true, + )?)) + } + + async fn insert_into( + &self, + _state: &SessionState, + input: Arc, + _overwrite: bool, + ) -> Result> { + let sort = self.0.sort.as_ref(); + let ordering = sort.map(|o| o.iter().map(|e| e.clone().into()).collect()); + + Ok(Arc::new(FileSinkExec::new( + input, + Arc::new(StreamWrite(self.0.clone())), + self.0.schema.clone(), + ordering, + ))) + } +} + +struct StreamRead(Arc); + +impl PartitionStream for StreamRead { + fn schema(&self) -> &SchemaRef { + &self.0.schema + } + + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + let config = self.0.clone(); + let schema = self.0.schema.clone(); + let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); + let tx = builder.tx(); + builder.spawn_blocking(move || { + let file = File::open(&config.location)?; + let reader = ReaderBuilder::new(config.schema.clone()).build(file)?; + for b in reader { + if tx.blocking_send(b.map_err(Into::into)).is_err() { + break; + } + } + Ok(()) + }); + builder.build() + } +} + +#[derive(Debug)] +struct StreamWrite(Arc); + +impl DisplayAs for StreamWrite { + fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +#[async_trait] +impl DataSink for StreamWrite { + fn as_any(&self) -> &dyn Any { + self + } + + fn metrics(&self) -> Option { + None + } + + async fn write_all( + &self, + mut data: SendableRecordBatchStream, + _context: &Arc, + ) -> Result { + let config = self.0.clone(); + let (sender, mut receiver) = tokio::sync::mpsc::channel::(2); + // Note: FIFO Files support poll so this could use AsyncFd + let write = AbortOnDropSingle::new(spawn_blocking(move || { + let file = OpenOptions::new().write(true).open(&config.location)?; + let mut count = 0_u64; + let mut writer = WriterBuilder::new().with_header(false).build(file); + while let Some(batch) = receiver.blocking_recv() { + count += batch.num_rows() as u64; + writer.write(&batch)?; + } + Ok(count) + })); + + while let Some(b) = data.next().await.transpose()? { + if sender.send(b).await.is_err() { + break; + } + } + drop(sender); + write.await.unwrap() + } +} diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index fefa6c5aaee0..b420b47f5c3d 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -20,13 +20,23 @@ #[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { + use std::collections::HashMap; + use std::fs::{File, OpenOptions}; + use std::io::Write; + use std::path::PathBuf; + use std::sync::Arc; + use arrow::array::Array; - use arrow::csv::{ReaderBuilder, WriterBuilder}; + use arrow::csv::ReaderBuilder; use arrow::datatypes::{DataType, Field, Schema}; - use arrow_array::RecordBatch; use arrow_schema::{SchemaRef, SortOptions}; - use async_trait::async_trait; - use datafusion::datasource::provider::TableProviderFactory; + use futures::StreamExt; + use nix::sys::stat; + use nix::unistd; + use tempfile::TempDir; + use tokio::task::{spawn_blocking, JoinHandle}; + + use datafusion::datasource::stream::{StreamConfig, StreamTable, StreamTableFactory}; use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::{ @@ -36,174 +46,7 @@ mod unix_test { }; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_execution::runtime_env::RuntimeEnv; - use datafusion_execution::{SendableRecordBatchStream, TaskContext}; - use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; - use datafusion_physical_plan::common::AbortOnDropSingle; - use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; - use datafusion_physical_plan::metrics::MetricsSet; - use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; - use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; - use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; - use futures::StreamExt; - use nix::sys::stat; - use nix::unistd; - use std::any::Any; - use std::collections::HashMap; - use std::fmt::Formatter; - use std::fs::{File, OpenOptions}; - use std::io::Write; - use std::path::PathBuf; - use std::sync::Arc; - use tempfile::TempDir; - use tokio::task::{spawn_blocking, JoinHandle}; - - #[derive(Default)] - struct FifoFactory {} - - #[async_trait] - impl TableProviderFactory for FifoFactory { - async fn create( - &self, - _state: &SessionState, - cmd: &CreateExternalTable, - ) -> Result> { - let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); - let location = cmd.location.clone(); - Ok(fifo_table(schema, location, None)) - } - } - - #[derive(Debug)] - struct FifoConfig { - schema: SchemaRef, - location: PathBuf, - sort: Option, - } - - struct FifoTable(Arc); - - #[async_trait] - impl TableProvider for FifoTable { - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> SchemaRef { - self.0.schema.clone() - } - - fn table_type(&self) -> TableType { - TableType::Temporary - } - - async fn scan( - &self, - _state: &SessionState, - projection: Option<&Vec>, - _filters: &[Expr], - _limit: Option, - ) -> Result> { - Ok(Arc::new(StreamingTableExec::try_new( - self.0.schema.clone(), - vec![Arc::new(FifoRead(self.0.clone())) as _], - projection, - self.0.sort.clone(), - true, - )?)) - } - - async fn insert_into( - &self, - _state: &SessionState, - input: Arc, - _overwrite: bool, - ) -> Result> { - let sort = self.0.sort.as_ref(); - let ordering = sort.map(|o| o.iter().map(|e| e.clone().into()).collect()); - - Ok(Arc::new(FileSinkExec::new( - input, - Arc::new(FifoWrite(self.0.clone())), - self.0.schema.clone(), - ordering, - ))) - } - } - - struct FifoRead(Arc); - - impl PartitionStream for FifoRead { - fn schema(&self) -> &SchemaRef { - &self.0.schema - } - - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - let config = self.0.clone(); - let schema = self.0.schema.clone(); - let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); - let tx = builder.tx(); - builder.spawn_blocking(move || { - let file = File::open(&config.location)?; - let reader = ReaderBuilder::new(config.schema.clone()).build(file)?; - for b in reader { - if tx.blocking_send(b.map_err(Into::into)).is_err() { - break; - } - } - Ok(()) - }); - builder.build() - } - } - - #[derive(Debug)] - struct FifoWrite(Arc); - - impl DisplayAs for FifoWrite { - fn fmt_as(&self, _t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result { - write!(f, "{self:?}") - } - } - - #[async_trait] - impl DataSink for FifoWrite { - fn as_any(&self) -> &dyn Any { - self - } - - fn metrics(&self) -> Option { - None - } - - async fn write_all( - &self, - mut data: SendableRecordBatchStream, - _context: &Arc, - ) -> Result { - let config = self.0.clone(); - let (sender, mut receiver) = tokio::sync::mpsc::channel::(2); - // Note: FIFO Files support poll so this could use AsyncFd - let write = AbortOnDropSingle::new(spawn_blocking(move || { - let file = OpenOptions::new().write(true).open(&config.location)?; - let mut count = 0_u64; - let mut writer = WriterBuilder::new().with_header(false).build(file); - while let Some(batch) = receiver.blocking_recv() { - count += batch.num_rows() as u64; - writer.write(&batch)?; - } - Ok(count) - })); - - while let Some(b) = data.next().await.transpose()? { - if sender.send(b).await.is_err() { - break; - } - } - drop(sender); - write.await.unwrap() - } - } /// Makes a TableProvider for a fifo file fn fifo_table( @@ -211,11 +54,8 @@ mod unix_test { path: impl Into, sort: Option, ) -> Arc { - Arc::new(FifoTable(Arc::new(FifoConfig { - schema, - sort, - location: path.into(), - }))) + let config = StreamConfig::new_file(schema, path.into()).with_sort(sort); + Arc::new(StreamTable::new(Arc::new(config))) } // ! For the sake of the test, do not alter the numbers. ! @@ -432,7 +272,10 @@ mod unix_test { let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE); let mut state = SessionState::new_with_config_rt(config, runtime); let mut factories = HashMap::with_capacity(1); - factories.insert("CSV".to_string(), Arc::new(FifoFactory::default()) as _); + factories.insert( + "CSV".to_string(), + Arc::new(StreamTableFactory::default()) as _, + ); *state.table_factories_mut() = factories; let ctx = SessionContext::new_with_state(state); From 281f2ef358a95dbc934025d6d3d27971b683c8a4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 09:00:14 +0000 Subject: [PATCH 04/13] Add StreamEncoding --- datafusion/core/src/datasource/stream.rs | 108 ++++++++++++++++++----- 1 file changed, 88 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index b96c28ed3546..2bfd669c5766 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -17,14 +17,21 @@ //! TableProvider for stream sources, such as FIFO files -use crate::datasource::provider::TableProviderFactory; -use crate::datasource::TableProvider; -use crate::execution::context::SessionState; -use arrow::csv::{ReaderBuilder, WriterBuilder}; -use arrow_array::RecordBatch; +use std::any::Any; +use std::fmt::Formatter; +use std::fs::{File, OpenOptions}; +use std::io::{BufReader, Read, Write}; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; + +use arrow_array::{RecordBatch, RecordBatchReader, RecordBatchWriter}; use arrow_schema::SchemaRef; use async_trait::async_trait; -use datafusion_common::Result; +use futures::StreamExt; +use tokio::task::spawn_blocking; + +use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; use datafusion_physical_expr::LexOrdering; @@ -34,13 +41,10 @@ use datafusion_physical_plan::metrics::MetricsSet; use datafusion_physical_plan::stream::RecordBatchReceiverStreamBuilder; use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; -use futures::StreamExt; -use std::any::Any; -use std::fmt::Formatter; -use std::fs::{File, OpenOptions}; -use std::path::PathBuf; -use std::sync::Arc; -use tokio::task::spawn_blocking; + +use crate::datasource::provider::TableProviderFactory; +use crate::datasource::TableProvider; +use crate::execution::context::SessionState; /// A [`TableProviderFactory`] for [`StreamTable`] #[derive(Default)] @@ -55,11 +59,67 @@ impl TableProviderFactory for StreamTableFactory { ) -> Result> { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); let location = cmd.location.clone(); - Ok(Arc::new(StreamTable(Arc::new(StreamConfig { - schema, - sort: None, // TODO - location: location.into(), - })))) + let encoding = cmd.file_type.parse()?; + let config = + StreamConfig::new_file(schema, location.into()).with_encoding(encoding); + + Ok(Arc::new(StreamTable(Arc::new(config)))) + } +} + +/// The data encoding for [`StreamTable`] +#[derive(Debug, Clone)] +pub enum StreamEncoding { + /// CSV records + Csv, + /// Newline-delimited JSON records + Json, +} + +impl StreamEncoding { + fn reader( + &self, + schema: SchemaRef, + read: R, + ) -> Result> { + match self { + StreamEncoding::Csv => Ok(Box::new( + arrow::csv::ReaderBuilder::new(schema).build(read)?, + )), + StreamEncoding::Json => { + let reader = arrow::json::ReaderBuilder::new(schema) + .build(BufReader::new(read))?; + + Ok(Box::new(reader)) + } + } + } + + fn writer(&self, write: W) -> Result> { + match self { + StreamEncoding::Csv => { + let writer = arrow::csv::WriterBuilder::new() + .with_header(false) + .build(write); + + Ok(Box::new(writer)) + } + StreamEncoding::Json => { + Ok(Box::new(arrow::json::LineDelimitedWriter::new(write))) + } + } + } +} + +impl FromStr for StreamEncoding { + type Err = DataFusionError; + + fn from_str(s: &str) -> std::result::Result { + match s.to_ascii_lowercase().as_str() { + "csv" => Ok(Self::Csv), + "json" => Ok(Self::Json), + _ => plan_err!("Unrecognised StreamEncoding {}", s), + } } } @@ -68,6 +128,7 @@ impl TableProviderFactory for StreamTableFactory { pub struct StreamConfig { schema: SchemaRef, location: PathBuf, + encoding: StreamEncoding, sort: Option, } @@ -77,6 +138,7 @@ impl StreamConfig { Self { schema, location, + encoding: StreamEncoding::Csv, sort: None, } } @@ -86,6 +148,12 @@ impl StreamConfig { self.sort = sort; self } + + /// Specify an encoding for the stream + pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self { + self.encoding = encoding; + self + } } /// A [`TableProvider`] for a stream source, such as a FIFO file @@ -160,7 +228,7 @@ impl PartitionStream for StreamRead { let tx = builder.tx(); builder.spawn_blocking(move || { let file = File::open(&config.location)?; - let reader = ReaderBuilder::new(config.schema.clone()).build(file)?; + let reader = config.encoding.reader(config.schema.clone(), file)?; for b in reader { if tx.blocking_send(b.map_err(Into::into)).is_err() { break; @@ -202,7 +270,7 @@ impl DataSink for StreamWrite { let write = AbortOnDropSingle::new(spawn_blocking(move || { let file = OpenOptions::new().write(true).open(&config.location)?; let mut count = 0_u64; - let mut writer = WriterBuilder::new().with_header(false).build(file); + let mut writer = config.encoding.writer(file)?; while let Some(batch) = receiver.blocking_recv() { count += batch.num_rows() as u64; writer.write(&batch)?; From 4aac384735922b6b4be21ed852591c6812f91487 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 09:39:39 +0000 Subject: [PATCH 05/13] Rework sort order --- .../core/src/datasource/listing/table.rs | 35 ++-------------- datafusion/core/src/datasource/mod.rs | 40 +++++++++++++++++++ datafusion/core/src/datasource/stream.rs | 37 ++++++++++++----- datafusion/core/tests/fifo.rs | 19 +++------ datafusion/physical-plan/src/streaming.rs | 24 ++++++++--- 5 files changed, 93 insertions(+), 62 deletions(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d26d417bd8b2..31dfc409bea3 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -26,6 +26,7 @@ use super::PartitionedFile; #[cfg(feature = "parquet")] use crate::datasource::file_format::parquet::ParquetFormat; use crate::datasource::{ + create_ordering, file_format::{ arrow::ArrowFormat, avro::AvroFormat, @@ -40,7 +41,6 @@ use crate::datasource::{ TableProvider, TableType, }; use crate::logical_expr::TableProviderFilterPushDown; -use crate::physical_plan; use crate::{ error::{DataFusionError, Result}, execution::context::SessionState, @@ -48,7 +48,6 @@ use crate::{ physical_plan::{empty::EmptyExec, ExecutionPlan, Statistics}, }; -use arrow::compute::SortOptions; use arrow::datatypes::{DataType, Field, SchemaBuilder, SchemaRef}; use arrow_schema::Schema; use datafusion_common::{ @@ -57,10 +56,9 @@ use datafusion_common::{ }; use datafusion_execution::cache::cache_manager::FileStatisticsCache; use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache; -use datafusion_expr::expr::Sort; use datafusion_optimizer::utils::conjunction; use datafusion_physical_expr::{ - create_physical_expr, LexOrdering, PhysicalSortExpr, PhysicalSortRequirement, + create_physical_expr, LexOrdering, PhysicalSortRequirement, }; use async_trait::async_trait; @@ -677,34 +675,7 @@ impl ListingTable { /// If file_sort_order is specified, creates the appropriate physical expressions fn try_create_output_ordering(&self) -> Result> { - let mut all_sort_orders = vec![]; - - for exprs in &self.options.file_sort_order { - // Construct PhsyicalSortExpr objects from Expr objects: - let sort_exprs = exprs - .iter() - .map(|expr| { - if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { - if let Expr::Column(col) = expr.as_ref() { - let expr = physical_plan::expressions::col(&col.name, self.table_schema.as_ref())?; - Ok(PhysicalSortExpr { - expr, - options: SortOptions { - descending: !asc, - nulls_first: *nulls_first, - }, - }) - } else { - plan_err!("Expected single column references in output_ordering, got {expr}") - } - } else { - plan_err!("Expected Expr::Sort in output_ordering, but got {expr}") - } - }) - .collect::>>()?; - all_sort_orders.push(sort_exprs); - } - Ok(all_sort_orders) + create_ordering(&self.table_schema, &self.options.file_sort_order) } } diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 8db09e2333e1..7fe7916534f7 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -44,3 +44,43 @@ pub use self::provider::TableProvider; pub use self::view::ViewTable; pub use crate::logical_expr::TableType; pub use statistics::get_statistics_with_limit; + +use arrow_schema::{Schema, SortOptions}; +use datafusion_common::{plan_err, DataFusionError, Result}; +use datafusion_expr::expr::Sort; +use datafusion_expr::Expr; +use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; + +fn create_ordering( + schema: &Schema, + sort_order: &[Vec], +) -> Result> { + let mut all_sort_orders = vec![]; + + for exprs in sort_order { + // Construct PhsyicalSortExpr objects from Expr objects: + let sort_exprs = exprs + .iter() + .map(|expr| { + if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { + if let Expr::Column(col) = expr.as_ref() { + let expr = expressions::col(&col.name, schema)?; + Ok(PhysicalSortExpr { + expr, + options: SortOptions { + descending: !asc, + nulls_first: *nulls_first, + }, + }) + } else { + plan_err!("Expected single column references in output_ordering, got {expr}") + } + } else { + plan_err!("Expected Expr::Sort in output_ordering, but got {expr}") + } + }) + .collect::>>()?; + all_sort_orders.push(sort_exprs); + } + Ok(all_sort_orders) +} diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 2bfd669c5766..acec1962a8cd 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -34,7 +34,6 @@ use tokio::task::spawn_blocking; use datafusion_common::{plan_err, DataFusionError, Result}; use datafusion_execution::{SendableRecordBatchStream, TaskContext}; use datafusion_expr::{CreateExternalTable, Expr, TableType}; -use datafusion_physical_expr::LexOrdering; use datafusion_physical_plan::common::AbortOnDropSingle; use datafusion_physical_plan::insert::{DataSink, FileSinkExec}; use datafusion_physical_plan::metrics::MetricsSet; @@ -43,7 +42,7 @@ use datafusion_physical_plan::streaming::{PartitionStream, StreamingTableExec}; use datafusion_physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan}; use crate::datasource::provider::TableProviderFactory; -use crate::datasource::TableProvider; +use crate::datasource::{create_ordering, TableProvider}; use crate::execution::context::SessionState; /// A [`TableProviderFactory`] for [`StreamTable`] @@ -60,8 +59,9 @@ impl TableProviderFactory for StreamTableFactory { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; - let config = - StreamConfig::new_file(schema, location.into()).with_encoding(encoding); + let config = StreamConfig::new_file(schema, location.into()) + .with_encoding(encoding) + .with_order(cmd.order_exprs.clone()); Ok(Arc::new(StreamTable(Arc::new(config)))) } @@ -129,7 +129,7 @@ pub struct StreamConfig { schema: SchemaRef, location: PathBuf, encoding: StreamEncoding, - sort: Option, + order: Vec>, } impl StreamConfig { @@ -139,13 +139,13 @@ impl StreamConfig { schema, location, encoding: StreamEncoding::Csv, - sort: None, + order: vec![], } } /// Specify a sort order for the stream - pub fn with_sort(mut self, sort: Option) -> Self { - self.sort = sort; + pub fn with_order(mut self, order: Vec>) -> Self { + self.order = order; self } @@ -187,11 +187,19 @@ impl TableProvider for StreamTable { _filters: &[Expr], _limit: Option, ) -> Result> { + let projected_schema = match projection { + Some(p) => { + let projected = self.0.schema.project(p)?; + create_ordering(&projected, &self.0.order)? + } + None => create_ordering(self.0.schema.as_ref(), &self.0.order)?, + }; + Ok(Arc::new(StreamingTableExec::try_new( self.0.schema.clone(), vec![Arc::new(StreamRead(self.0.clone())) as _], projection, - self.0.sort.clone(), + projected_schema, true, )?)) } @@ -202,8 +210,15 @@ impl TableProvider for StreamTable { input: Arc, _overwrite: bool, ) -> Result> { - let sort = self.0.sort.as_ref(); - let ordering = sort.map(|o| o.iter().map(|e| e.clone().into()).collect()); + let ordering = match self.0.order.first() { + Some(x) => { + let schema = self.0.schema.as_ref(); + let orders = create_ordering(schema, std::slice::from_ref(x))?; + let ordering = orders.into_iter().next().unwrap(); + Some(ordering.into_iter().map(Into::into).collect()) + } + None => None, + }; Ok(Arc::new(FileSinkExec::new( input, diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index b420b47f5c3d..0699b933da67 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -29,7 +29,7 @@ mod unix_test { use arrow::array::Array; use arrow::csv::ReaderBuilder; use arrow::datatypes::{DataType, Field, Schema}; - use arrow_schema::{SchemaRef, SortOptions}; + use arrow_schema::SchemaRef; use futures::StreamExt; use nix::sys::stat; use nix::unistd; @@ -40,21 +40,20 @@ mod unix_test { use datafusion::datasource::TableProvider; use datafusion::execution::context::SessionState; use datafusion::{ - physical_plan, prelude::{CsvReadOptions, SessionConfig, SessionContext}, test_util::{aggr_test_schema, arrow_test_data}, }; use datafusion_common::{exec_err, DataFusionError, Result}; use datafusion_execution::runtime_env::RuntimeEnv; - use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; + use datafusion_expr::Expr; /// Makes a TableProvider for a fifo file fn fifo_table( schema: SchemaRef, path: impl Into, - sort: Option, + sort: Vec>, ) -> Arc { - let config = StreamConfig::new_file(schema, path.into()).with_sort(sort); + let config = StreamConfig::new_file(schema, path.into()).with_order(sort); Arc::new(StreamTable::new(Arc::new(config))) } @@ -119,7 +118,7 @@ mod unix_test { ])); // Create a file with bounded or unbounded flag. - let provider = fifo_table(schema, fifo_path, None); + let provider = fifo_table(schema, fifo_path, vec![]); ctx.register_table("left", provider).unwrap(); // Register right table @@ -197,13 +196,7 @@ mod unix_test { ])); // Specify the ordering: - let order = Some(vec![PhysicalSortExpr { - expr: physical_plan::expressions::col("a1", schema.as_ref())?, - options: SortOptions { - descending: false, - nulls_first: false, - }, - }]); + let order = vec![vec![datafusion_expr::col("a1").sort(true, false)]]; // Set unbounded sorted files read configuration let provider = fifo_table(schema.clone(), left_fifo, order.clone()); diff --git a/datafusion/physical-plan/src/streaming.rs b/datafusion/physical-plan/src/streaming.rs index 27f03b727c29..d32ad8344ff0 100644 --- a/datafusion/physical-plan/src/streaming.rs +++ b/datafusion/physical-plan/src/streaming.rs @@ -28,7 +28,10 @@ use crate::{ExecutionPlan, Partitioning, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use datafusion_common::{internal_err, plan_err, DataFusionError, Result}; use datafusion_execution::TaskContext; -use datafusion_physical_expr::{LexOrdering, PhysicalSortExpr}; +use datafusion_physical_expr::{ + ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties, + PhysicalSortExpr, +}; use async_trait::async_trait; use futures::stream::StreamExt; @@ -48,7 +51,7 @@ pub struct StreamingTableExec { partitions: Vec>, projection: Option>, projected_schema: SchemaRef, - projected_output_ordering: Option, + projected_output_ordering: Vec, infinite: bool, } @@ -58,7 +61,7 @@ impl StreamingTableExec { schema: SchemaRef, partitions: Vec>, projection: Option<&Vec>, - projected_output_ordering: Option, + projected_output_ordering: impl IntoIterator, infinite: bool, ) -> Result { for x in partitions.iter() { @@ -81,7 +84,7 @@ impl StreamingTableExec { partitions, projected_schema, projection: projection.cloned().map(Into::into), - projected_output_ordering, + projected_output_ordering: projected_output_ordering.into_iter().collect(), infinite, }) } @@ -118,7 +121,7 @@ impl DisplayAs for StreamingTableExec { } self.projected_output_ordering - .as_deref() + .first() .map_or(Ok(()), |ordering| { if !ordering.is_empty() { write!( @@ -153,7 +156,16 @@ impl ExecutionPlan for StreamingTableExec { } fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - self.projected_output_ordering.as_deref() + self.projected_output_ordering + .first() + .map(|ordering| ordering.as_slice()) + } + + fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties { + ordering_equivalence_properties_helper( + self.schema(), + &self.projected_output_ordering, + ) } fn children(&self) -> Vec> { From f8ef869ede064114daf978ef49dc34ffbd989b38 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 09:51:35 +0000 Subject: [PATCH 06/13] Fix logical conflicts --- datafusion/core/src/datasource/listing/table.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index 31dfc409bea3..d93dbed65f8f 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1011,11 +1011,13 @@ mod tests { use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; + use arrow_schema::SortOptions; use datafusion_common::stats::Precision; use datafusion_common::{assert_contains, GetExt, ScalarValue}; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; use rstest::*; use tempfile::TempDir; + use datafusion_physical_expr::PhysicalSortExpr; /// It creates dummy file and checks if it can create unbounded input executors. async fn unbounded_table_helper( From 629a2d0542ba1f45c87a603f5c1914e57602aaa9 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 09:53:29 +0000 Subject: [PATCH 07/13] Format --- datafusion/core/src/datasource/listing/table.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index d93dbed65f8f..c22eb58e88fa 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -1015,9 +1015,9 @@ mod tests { use datafusion_common::stats::Precision; use datafusion_common::{assert_contains, GetExt, ScalarValue}; use datafusion_expr::{BinaryExpr, LogicalPlanBuilder, Operator}; + use datafusion_physical_expr::PhysicalSortExpr; use rstest::*; use tempfile::TempDir; - use datafusion_physical_expr::PhysicalSortExpr; /// It creates dummy file and checks if it can create unbounded input executors. async fn unbounded_table_helper( From 0e0ff1484d031c2d47eb5e37aa32e671ed3cf02a Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 11:39:07 +0000 Subject: [PATCH 08/13] Add DefaultTableProvider --- .../src/datasource/listing_table_factory.rs | 9 ++--- datafusion/core/src/datasource/provider.rs | 33 +++++++++++++++++++ datafusion/core/src/datasource/stream.rs | 2 +- datafusion/core/src/execution/context/mod.rs | 14 ++++---- datafusion/core/tests/fifo.rs | 15 ++------- 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 26f40518979a..f9a7ab04ce68 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -44,18 +44,13 @@ use datafusion_expr::CreateExternalTable; use async_trait::async_trait; /// A `TableProviderFactory` capable of creating new `ListingTable`s +#[derive(Debug, Default)] pub struct ListingTableFactory {} impl ListingTableFactory { /// Creates a new `ListingTableFactory` pub fn new() -> Self { - Self {} - } -} - -impl Default for ListingTableFactory { - fn default() -> Self { - Self::new() + Self::default() } } diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 7d9f9e86d603..65fe54683c9c 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -26,6 +26,8 @@ use datafusion_expr::{CreateExternalTable, LogicalPlan}; pub use datafusion_expr::{TableProviderFilterPushDown, TableType}; use crate::arrow::datatypes::SchemaRef; +use crate::datasource::listing_table_factory::ListingTableFactory; +use crate::datasource::stream::StreamTableFactory; use crate::error::Result; use crate::execution::context::SessionState; use crate::logical_expr::Expr; @@ -214,3 +216,34 @@ pub trait TableProviderFactory: Sync + Send { cmd: &CreateExternalTable, ) -> Result>; } + +/// The default [`TableProviderFactory`] +/// +/// If [`CreateExternalTable`] is unbounded calls [`ListingTableFactory::create`], +/// otherwise calls [`ListingTableFactory::create`] +#[derive(Debug, Default)] +pub struct DefaultTableFactory { + stream: StreamTableFactory, + listing: ListingTableFactory, +} + +impl DefaultTableFactory { + /// Creates a new [`DefaultTableFactory`] + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait] +impl TableProviderFactory for DefaultTableFactory { + async fn create( + &self, + state: &SessionState, + cmd: &CreateExternalTable, + ) -> Result> { + match cmd.unbounded { + true => self.stream.create(state, cmd).await, + false => self.listing.create(state, cmd).await, + } + } +} diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index acec1962a8cd..d91fce2c2ccb 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -46,7 +46,7 @@ use crate::datasource::{create_ordering, TableProvider}; use crate::execution::context::SessionState; /// A [`TableProviderFactory`] for [`StreamTable`] -#[derive(Default)] +#[derive(Debug, Default)] pub struct StreamTableFactory {} #[async_trait] diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 9c500ec07293..7e5b950e7af6 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -27,7 +27,6 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, datasource::{ listing::{ListingOptions, ListingTable}, - listing_table_factory::ListingTableFactory, provider::TableProviderFactory, }, datasource::{MemTable, ViewTable}, @@ -111,6 +110,7 @@ use datafusion_sql::planner::object_name_to_table_reference; use uuid::Uuid; // backwards compatibility +use crate::datasource::provider::DefaultTableFactory; use crate::execution::options::ArrowReadOptions; pub use datafusion_execution::config::SessionConfig; pub use datafusion_execution::TaskContext; @@ -1285,12 +1285,12 @@ impl SessionState { let mut table_factories: HashMap> = HashMap::new(); #[cfg(feature = "parquet")] - table_factories.insert("PARQUET".into(), Arc::new(ListingTableFactory::new())); - table_factories.insert("CSV".into(), Arc::new(ListingTableFactory::new())); - table_factories.insert("JSON".into(), Arc::new(ListingTableFactory::new())); - table_factories.insert("NDJSON".into(), Arc::new(ListingTableFactory::new())); - table_factories.insert("AVRO".into(), Arc::new(ListingTableFactory::new())); - table_factories.insert("ARROW".into(), Arc::new(ListingTableFactory::new())); + table_factories.insert("PARQUET".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("CSV".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("JSON".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("NDJSON".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("AVRO".into(), Arc::new(DefaultTableFactory::new())); + table_factories.insert("ARROW".into(), Arc::new(DefaultTableFactory::new())); if config.create_default_catalog_and_schema() { let default_catalog = MemoryCatalogProvider::new(); diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 0699b933da67..72580aae5582 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -20,7 +20,6 @@ #[cfg(target_family = "unix")] #[cfg(test)] mod unix_test { - use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::PathBuf; @@ -36,15 +35,13 @@ mod unix_test { use tempfile::TempDir; use tokio::task::{spawn_blocking, JoinHandle}; - use datafusion::datasource::stream::{StreamConfig, StreamTable, StreamTableFactory}; + use datafusion::datasource::stream::{StreamConfig, StreamTable}; use datafusion::datasource::TableProvider; - use datafusion::execution::context::SessionState; use datafusion::{ prelude::{CsvReadOptions, SessionConfig, SessionContext}, test_util::{aggr_test_schema, arrow_test_data}, }; use datafusion_common::{exec_err, DataFusionError, Result}; - use datafusion_execution::runtime_env::RuntimeEnv; use datafusion_expr::Expr; /// Makes a TableProvider for a fifo file @@ -261,16 +258,8 @@ mod unix_test { #[tokio::test] async fn test_sql_insert_into_fifo() -> Result<()> { // create local execution context - let runtime = Arc::new(RuntimeEnv::default()); let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE); - let mut state = SessionState::new_with_config_rt(config, runtime); - let mut factories = HashMap::with_capacity(1); - factories.insert( - "CSV".to_string(), - Arc::new(StreamTableFactory::default()) as _, - ); - *state.table_factories_mut() = factories; - let ctx = SessionContext::new_with_state(state); + let ctx = SessionContext::new_with_config(config); // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; From 5daa47507a6c8af83a7da378792f7ec96f690f60 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 11:43:37 +0000 Subject: [PATCH 09/13] Fix doc --- datafusion/core/src/datasource/provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 65fe54683c9c..6c781360b3d9 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -219,7 +219,7 @@ pub trait TableProviderFactory: Sync + Send { /// The default [`TableProviderFactory`] /// -/// If [`CreateExternalTable`] is unbounded calls [`ListingTableFactory::create`], +/// If [`CreateExternalTable`] is unbounded calls [`StreamTableFactory::create`], /// otherwise calls [`ListingTableFactory::create`] #[derive(Debug, Default)] pub struct DefaultTableFactory { From 4de45aa1ba35b00f46f7d86282cba281c6ac2006 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Tue, 14 Nov 2023 12:32:32 +0000 Subject: [PATCH 10/13] Fix project sort keys and CSV headers --- datafusion/core/src/datasource/mod.rs | 47 ++++----- datafusion/core/src/datasource/provider.rs | 9 +- datafusion/core/src/datasource/stream.rs | 95 +++++++++++-------- datafusion/core/tests/fifo.rs | 4 + datafusion/sqllogictest/test_files/ddl.slt | 2 +- .../sqllogictest/test_files/groupby.slt | 10 +- datafusion/sqllogictest/test_files/window.slt | 12 +-- 7 files changed, 102 insertions(+), 77 deletions(-) diff --git a/datafusion/core/src/datasource/mod.rs b/datafusion/core/src/datasource/mod.rs index 7fe7916534f7..45f9bee6a58b 100644 --- a/datafusion/core/src/datasource/mod.rs +++ b/datafusion/core/src/datasource/mod.rs @@ -47,7 +47,6 @@ pub use statistics::get_statistics_with_limit; use arrow_schema::{Schema, SortOptions}; use datafusion_common::{plan_err, DataFusionError, Result}; -use datafusion_expr::expr::Sort; use datafusion_expr::Expr; use datafusion_physical_expr::{expressions, LexOrdering, PhysicalSortExpr}; @@ -58,29 +57,33 @@ fn create_ordering( let mut all_sort_orders = vec![]; for exprs in sort_order { - // Construct PhsyicalSortExpr objects from Expr objects: - let sort_exprs = exprs - .iter() - .map(|expr| { - if let Expr::Sort(Sort { expr, asc, nulls_first }) = expr { - if let Expr::Column(col) = expr.as_ref() { - let expr = expressions::col(&col.name, schema)?; - Ok(PhysicalSortExpr { - expr, - options: SortOptions { - descending: !asc, - nulls_first: *nulls_first, - }, - }) - } else { - plan_err!("Expected single column references in output_ordering, got {expr}") + // Construct PhysicalSortExpr objects from Expr objects: + let mut sort_exprs = vec![]; + for expr in exprs { + match expr { + Expr::Sort(sort) => match sort.expr.as_ref() { + Expr::Column(col) => match expressions::col(&col.name, schema) { + Ok(expr) => { + sort_exprs.push(PhysicalSortExpr { + expr, + options: SortOptions { + descending: !sort.asc, + nulls_first: sort.nulls_first, + }, + }); + } + // Cannot find expression in the projected_schema, stop iterating + // since rest of the orderings are violated + Err(_) => break, } - } else { - plan_err!("Expected Expr::Sort in output_ordering, but got {expr}") + expr => return plan_err!("Expected single column references in output_ordering, got {expr}"), } - }) - .collect::>>()?; - all_sort_orders.push(sort_exprs); + expr => return plan_err!("Expected Expr::Sort in output_ordering, but got {expr}"), + } + } + if !sort_exprs.is_empty() { + all_sort_orders.push(sort_exprs); + } } Ok(all_sort_orders) } diff --git a/datafusion/core/src/datasource/provider.rs b/datafusion/core/src/datasource/provider.rs index 6c781360b3d9..4fe433044e6c 100644 --- a/datafusion/core/src/datasource/provider.rs +++ b/datafusion/core/src/datasource/provider.rs @@ -241,7 +241,14 @@ impl TableProviderFactory for DefaultTableFactory { state: &SessionState, cmd: &CreateExternalTable, ) -> Result> { - match cmd.unbounded { + let mut unbounded = cmd.unbounded; + for (k, v) in &cmd.options { + if k.eq_ignore_ascii_case("unbounded") && v.eq_ignore_ascii_case("true") { + unbounded = true + } + } + + match unbounded { true => self.stream.create(state, cmd).await, false => self.listing.create(state, cmd).await, } diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index d91fce2c2ccb..5849888931f4 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -20,7 +20,7 @@ use std::any::Any; use std::fmt::Formatter; use std::fs::{File, OpenOptions}; -use std::io::{BufReader, Read, Write}; +use std::io::BufReader; use std::path::PathBuf; use std::str::FromStr; use std::sync::Arc; @@ -59,9 +59,11 @@ impl TableProviderFactory for StreamTableFactory { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); let location = cmd.location.clone(); let encoding = cmd.file_type.parse()?; + let config = StreamConfig::new_file(schema, location.into()) .with_encoding(encoding) - .with_order(cmd.order_exprs.clone()); + .with_order(cmd.order_exprs.clone()) + .with_header(cmd.has_header); Ok(Arc::new(StreamTable(Arc::new(config)))) } @@ -76,41 +78,6 @@ pub enum StreamEncoding { Json, } -impl StreamEncoding { - fn reader( - &self, - schema: SchemaRef, - read: R, - ) -> Result> { - match self { - StreamEncoding::Csv => Ok(Box::new( - arrow::csv::ReaderBuilder::new(schema).build(read)?, - )), - StreamEncoding::Json => { - let reader = arrow::json::ReaderBuilder::new(schema) - .build(BufReader::new(read))?; - - Ok(Box::new(reader)) - } - } - } - - fn writer(&self, write: W) -> Result> { - match self { - StreamEncoding::Csv => { - let writer = arrow::csv::WriterBuilder::new() - .with_header(false) - .build(write); - - Ok(Box::new(writer)) - } - StreamEncoding::Json => { - Ok(Box::new(arrow::json::LineDelimitedWriter::new(write))) - } - } - } -} - impl FromStr for StreamEncoding { type Err = DataFusionError; @@ -129,6 +96,7 @@ pub struct StreamConfig { schema: SchemaRef, location: PathBuf, encoding: StreamEncoding, + header: bool, order: Vec>, } @@ -140,6 +108,7 @@ impl StreamConfig { location, encoding: StreamEncoding::Csv, order: vec![], + header: false, } } @@ -149,11 +118,55 @@ impl StreamConfig { self } + /// Specify whether the file has a header (only applicable for [`StreamEncoding::Csv`]) + pub fn with_header(mut self, header: bool) -> Self { + self.header = header; + self + } + /// Specify an encoding for the stream pub fn with_encoding(mut self, encoding: StreamEncoding) -> Self { self.encoding = encoding; self } + + fn reader(&self) -> Result> { + let file = File::open(&self.location)?; + let schema = self.schema.clone(); + match &self.encoding { + StreamEncoding::Csv => { + let reader = arrow::csv::ReaderBuilder::new(schema) + .with_header(self.header) + .build(file)?; + + Ok(Box::new(reader)) + } + StreamEncoding::Json => { + let reader = arrow::json::ReaderBuilder::new(schema) + .build(BufReader::new(file))?; + + Ok(Box::new(reader)) + } + } + } + + fn writer(&self) -> Result> { + match &self.encoding { + StreamEncoding::Csv => { + let header = self.header && !self.location.exists(); + let file = OpenOptions::new().write(true).open(&self.location)?; + let writer = arrow::csv::WriterBuilder::new() + .with_header(header) + .build(file); + + Ok(Box::new(writer)) + } + StreamEncoding::Json => { + let file = OpenOptions::new().write(true).open(&self.location)?; + Ok(Box::new(arrow::json::LineDelimitedWriter::new(file))) + } + } + } } /// A [`TableProvider`] for a stream source, such as a FIFO file @@ -177,7 +190,7 @@ impl TableProvider for StreamTable { } fn table_type(&self) -> TableType { - TableType::Temporary + TableType::Base } async fn scan( @@ -242,8 +255,7 @@ impl PartitionStream for StreamRead { let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); let tx = builder.tx(); builder.spawn_blocking(move || { - let file = File::open(&config.location)?; - let reader = config.encoding.reader(config.schema.clone(), file)?; + let reader = config.reader()?; for b in reader { if tx.blocking_send(b.map_err(Into::into)).is_err() { break; @@ -283,9 +295,8 @@ impl DataSink for StreamWrite { let (sender, mut receiver) = tokio::sync::mpsc::channel::(2); // Note: FIFO Files support poll so this could use AsyncFd let write = AbortOnDropSingle::new(spawn_blocking(move || { - let file = OpenOptions::new().write(true).open(&config.location)?; let mut count = 0_u64; - let mut writer = config.encoding.writer(file)?; + let mut writer = config.writer()?; while let Some(batch) = receiver.blocking_recv() { count += batch.num_rows() as u64; writer.write(&batch)?; diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 72580aae5582..803a78f8fcdc 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -314,6 +314,8 @@ mod unix_test { a2 INT NOT NULL ) STORED AS CSV + WITH HEADER ROW + OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{source_display_fifo_path}'" )) .await?; @@ -325,6 +327,8 @@ mod unix_test { a2 INT NOT NULL ) STORED AS CSV + WITH HEADER ROW + OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{sink_display_fifo_path}'" )) .await?; diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index ed4f4b4a11ac..682972b5572a 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -750,7 +750,7 @@ query TT explain select c1 from t; ---- logical_plan TableScan: t projection=[c1] -physical_plan CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/empty.csv]]}, projection=[c1], infinite_source=true, has_header=true +physical_plan StreamingTableExec: partition_sizes=1, projection=[c1], infinite_source=true statement ok drop table t; diff --git a/datafusion/sqllogictest/test_files/groupby.slt b/datafusion/sqllogictest/test_files/groupby.slt index 105f11f21628..f5eda4b72538 100644 --- a/datafusion/sqllogictest/test_files/groupby.slt +++ b/datafusion/sqllogictest/test_files/groupby.slt @@ -2115,7 +2115,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, SUM(annotate physical_plan ProjectionExec: expr=[a@1 as a, b@0 as b, SUM(annotated_data_infinite2.c)@2 as summation1] --AggregateExec: mode=Single, gby=[b@1 as b, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=Sorted -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III @@ -2146,7 +2146,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.d, SUM(annotate physical_plan ProjectionExec: expr=[a@1 as a, d@0 as d, SUM(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as summation1] --AggregateExec: mode=Single, gby=[d@2 as d, a@0 as a], aggr=[SUM(annotated_data_infinite2.c)], ordering_mode=PartiallySorted([1]) -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST], has_header=true +----StreamingTableExec: partition_sizes=1, projection=[a, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST] query III SELECT a, d, @@ -2179,7 +2179,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, FIRST_VALUE( physical_plan ProjectionExec: expr=[a@0 as a, b@1 as b, FIRST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as first_c] --AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[FIRST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III SELECT a, b, FIRST_VALUE(c ORDER BY a DESC) as first_c @@ -2205,7 +2205,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a physical_plan ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c) ORDER BY [annotated_data_infinite2.a DESC NULLS FIRST]@2 as last_c] --AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III SELECT a, b, LAST_VALUE(c ORDER BY a DESC) as last_c @@ -2232,7 +2232,7 @@ Projection: annotated_data_infinite2.a, annotated_data_infinite2.b, LAST_VALUE(a physical_plan ProjectionExec: expr=[a@0 as a, b@1 as b, LAST_VALUE(annotated_data_infinite2.c)@2 as last_c] --AggregateExec: mode=Single, gby=[a@0 as a, b@1 as b], aggr=[LAST_VALUE(annotated_data_infinite2.c)], ordering_mode=Sorted -----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +----StreamingTableExec: partition_sizes=1, projection=[a, b, c], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query III SELECT a, b, LAST_VALUE(c) as last_c diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index 2eb0576d559b..c667faf9f431 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -2812,7 +2812,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2 ----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as count2, ts@0 as ts] ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }], mode=[Sorted] --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }], mode=[Sorted] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true +----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST] query IIII @@ -2858,7 +2858,7 @@ ProjectionExec: expr=[sum1@0 as sum1, sum2@1 as sum2, count1@2 as count1, count2 ----ProjectionExec: expr=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@4 as sum1, SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@2 as sum2, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING@5 as count1, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING@3 as count2, ts@0 as ts] ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING: Ok(Field { name: "COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(1)) }], mode=[Sorted] --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }, COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "COUNT(annotated_data_infinite.inc_col) ORDER BY [annotated_data_infinite.ts DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(3)) }], mode=[Sorted] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST], has_header=true +----------StreamingTableExec: partition_sizes=1, projection=[ts, inc_col], infinite_source=true, output_ordering=[ts@0 ASC NULLS LAST] query IIII @@ -2962,7 +2962,7 @@ ProjectionExec: expr=[a@1 as a, b@2 as b, c@3 as c, SUM(annotated_data_infinite2 ------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: CurrentRow }], mode=[PartiallySorted([0, 1])] --------------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 2 PRECEDING AND 1 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(2)), end_bound: Following(UInt64(1)) }, SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING: Ok(Field { name: "SUM(annotated_data_infinite2.c) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(5)), end_bound: Following(UInt64(5)) }], mode=[Sorted] ----------------ProjectionExec: expr=[CAST(c@2 AS Int64) as CAST(annotated_data_infinite2.c AS Int64)annotated_data_infinite2.c, a@0 as a, b@1 as b, c@2 as c, d@3 as d] -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] query IIIIIIIIIIIIIII @@ -3104,7 +3104,7 @@ CoalesceBatchesExec: target_batch_size=4096 ----GlobalLimitExec: skip=0, fetch=5 ------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@5 as rn1] --------BoundedWindowAggExec: wdw=[ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_infinite2.a ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] -----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +----------StreamingTableExec: partition_sizes=1, projection=[a0, a, b, c, d], infinite_source=true, output_ordering=[a@1 ASC NULLS LAST, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST] # this is a negative test for asserting that window functions (other than ROW_NUMBER) # are not added to ordering equivalence @@ -3217,7 +3217,7 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da ------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.b, annotated_data_infinite2.a] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] --------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.d] ORDER BY [annotated_data_infinite2.b ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[PartiallySorted([0])] ----------BoundedWindowAggExec: wdw=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "SUM(annotated_data_infinite2.a) PARTITION BY [annotated_data_infinite2.a, annotated_data_infinite2.b] ORDER BY [annotated_data_infinite2.c ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(NULL)), end_bound: CurrentRow }], mode=[Sorted] -------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] statement ok set datafusion.execution.target_partitions = 2; @@ -3255,7 +3255,7 @@ ProjectionExec: expr=[SUM(annotated_data_infinite2.a) PARTITION BY [annotated_da ------------------------CoalesceBatchesExec: target_batch_size=4096 --------------------------SortPreservingRepartitionExec: partitioning=Hash([a@0, b@1], 2), input_partitions=2, sort_exprs=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST,c@2 ASC NULLS LAST ----------------------------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -------------------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], has_header=true +------------------------------StreamingTableExec: partition_sizes=1, projection=[a, b, c, d], infinite_source=true, output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST] # reset the partition number 1 again statement ok From 49de1e4092d4749b32d5535a1804f06c53c841f3 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Wed, 15 Nov 2023 09:55:47 +0000 Subject: [PATCH 11/13] Respect batch size on read --- datafusion/core/src/datasource/stream.rs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 5849888931f4..466d49bbdf69 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -130,19 +130,21 @@ impl StreamConfig { self } - fn reader(&self) -> Result> { + fn reader(&self, batch_size: usize) -> Result> { let file = File::open(&self.location)?; let schema = self.schema.clone(); match &self.encoding { StreamEncoding::Csv => { let reader = arrow::csv::ReaderBuilder::new(schema) .with_header(self.header) + .with_batch_size(batch_size) .build(file)?; Ok(Box::new(reader)) } StreamEncoding::Json => { let reader = arrow::json::ReaderBuilder::new(schema) + .with_batch_size(batch_size) .build(BufReader::new(file))?; Ok(Box::new(reader)) @@ -249,13 +251,14 @@ impl PartitionStream for StreamRead { &self.0.schema } - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { + fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { let config = self.0.clone(); let schema = self.0.schema.clone(); + let batch_size = ctx.session_config().batch_size(); let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); let tx = builder.tx(); builder.spawn_blocking(move || { - let reader = config.reader()?; + let reader = config.reader(batch_size)?; for b in reader { if tx.blocking_send(b.map_err(Into::into)).is_err() { break; From e5597318ada0e2643c333376286631842b7456e8 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Wed, 15 Nov 2023 13:52:27 +0300 Subject: [PATCH 12/13] Tests are updated --- datafusion/core/src/datasource/stream.rs | 24 ++-- datafusion/core/tests/fifo.rs | 154 ++++++++++++++++------- 2 files changed, 126 insertions(+), 52 deletions(-) diff --git a/datafusion/core/src/datasource/stream.rs b/datafusion/core/src/datasource/stream.rs index 466d49bbdf69..cf95dd249a7f 100644 --- a/datafusion/core/src/datasource/stream.rs +++ b/datafusion/core/src/datasource/stream.rs @@ -53,7 +53,7 @@ pub struct StreamTableFactory {} impl TableProviderFactory for StreamTableFactory { async fn create( &self, - _state: &SessionState, + state: &SessionState, cmd: &CreateExternalTable, ) -> Result> { let schema: SchemaRef = Arc::new(cmd.schema.as_ref().into()); @@ -63,7 +63,8 @@ impl TableProviderFactory for StreamTableFactory { let config = StreamConfig::new_file(schema, location.into()) .with_encoding(encoding) .with_order(cmd.order_exprs.clone()) - .with_header(cmd.has_header); + .with_header(cmd.has_header) + .with_batch_size(state.config().batch_size()); Ok(Arc::new(StreamTable(Arc::new(config)))) } @@ -95,6 +96,7 @@ impl FromStr for StreamEncoding { pub struct StreamConfig { schema: SchemaRef, location: PathBuf, + batch_size: usize, encoding: StreamEncoding, header: bool, order: Vec>, @@ -106,6 +108,7 @@ impl StreamConfig { Self { schema, location, + batch_size: 1024, encoding: StreamEncoding::Csv, order: vec![], header: false, @@ -118,6 +121,12 @@ impl StreamConfig { self } + /// Specify the batch size + pub fn with_batch_size(mut self, batch_size: usize) -> Self { + self.batch_size = batch_size; + self + } + /// Specify whether the file has a header (only applicable for [`StreamEncoding::Csv`]) pub fn with_header(mut self, header: bool) -> Self { self.header = header; @@ -130,21 +139,21 @@ impl StreamConfig { self } - fn reader(&self, batch_size: usize) -> Result> { + fn reader(&self) -> Result> { let file = File::open(&self.location)?; let schema = self.schema.clone(); match &self.encoding { StreamEncoding::Csv => { let reader = arrow::csv::ReaderBuilder::new(schema) .with_header(self.header) - .with_batch_size(batch_size) + .with_batch_size(self.batch_size) .build(file)?; Ok(Box::new(reader)) } StreamEncoding::Json => { let reader = arrow::json::ReaderBuilder::new(schema) - .with_batch_size(batch_size) + .with_batch_size(self.batch_size) .build(BufReader::new(file))?; Ok(Box::new(reader)) @@ -251,14 +260,13 @@ impl PartitionStream for StreamRead { &self.0.schema } - fn execute(&self, ctx: Arc) -> SendableRecordBatchStream { + fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { let config = self.0.clone(); let schema = self.0.schema.clone(); - let batch_size = ctx.session_config().batch_size(); let mut builder = RecordBatchReceiverStreamBuilder::new(schema, 2); let tx = builder.tx(); builder.spawn_blocking(move || { - let reader = config.reader(batch_size)?; + let reader = config.reader()?; for b in reader { if tx.blocking_send(b.map_err(Into::into)).is_err() { break; diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 803a78f8fcdc..52355d8d3b8e 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -23,7 +23,10 @@ mod unix_test { use std::fs::{File, OpenOptions}; use std::io::Write; use std::path::PathBuf; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::thread; + use std::time::{Duration, Instant}; use arrow::array::Array; use arrow::csv::ReaderBuilder; @@ -50,20 +53,13 @@ mod unix_test { path: impl Into, sort: Vec>, ) -> Arc { - let config = StreamConfig::new_file(schema, path.into()).with_order(sort); + let config = StreamConfig::new_file(schema, path.into()) + .with_order(sort) + .with_batch_size(TEST_BATCH_SIZE) + .with_header(true); Arc::new(StreamTable::new(Arc::new(config))) } - // ! For the sake of the test, do not alter the numbers. ! - // Session batch size - const TEST_BATCH_SIZE: usize = 20; - // Number of lines written to FIFO - const TEST_DATA_SIZE: usize = 20_000; - // Number of lines what can be joined. Each joinable key produced 20 lines with - // aggregate_test_100 dataset. We will use these joinable keys for understanding - // incremental execution. - const TEST_JOIN_RATIO: f64 = 0.01; - fn create_fifo_file(tmp_dir: &TempDir, file_name: &str) -> Result { let file_path = tmp_dir.path().join(file_name); // Simulate an infinite environment via a FIFO file @@ -74,20 +70,80 @@ mod unix_test { } } + fn write_to_fifo( + mut file: &File, + line: &str, + ref_time: Instant, + broken_pipe_timeout: Duration, + ) -> Result<()> { + // We need to handle broken pipe error until the reader is ready. This + // is why we use a timeout to limit the wait duration for the reader. + // If the error is different than broken pipe, we fail immediately. + while let Err(e) = file.write_all(line.as_bytes()) { + if e.raw_os_error().unwrap() == 32 { + let interval = Instant::now().duration_since(ref_time); + if interval < broken_pipe_timeout { + thread::sleep(Duration::from_millis(100)); + continue; + } + } + return exec_err!("{}", e); + } + Ok(()) + } + + fn create_writing_thread( + file_path: PathBuf, + header: String, + lines: Vec, + waiting_lock: Arc, + wait_until: usize, + ) -> JoinHandle<()> { + // Timeout for a long period of BrokenPipe error + let broken_pipe_timeout = Duration::from_secs(10); + let sa = file_path.clone(); + // Spawn a new thread to write to the FIFO file + spawn_blocking(move || { + let file = OpenOptions::new().write(true).open(sa).unwrap(); + // Reference time to use when deciding to fail the test + let execution_start = Instant::now(); + write_to_fifo(&file, &header, execution_start, broken_pipe_timeout).unwrap(); + for (cnt, line) in lines.iter().enumerate() { + while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { + thread::sleep(Duration::from_millis(50)); + } + write_to_fifo(&file, &line, execution_start, broken_pipe_timeout) + .unwrap(); + } + drop(file); + }) + } + + // ! For the sake of the test, do not alter the numbers. ! + // Session batch size + const TEST_BATCH_SIZE: usize = 20; + // Number of lines written to FIFO + const TEST_DATA_SIZE: usize = 20_000; + // Number of lines what can be joined. Each joinable key produced 20 lines with + // aggregate_test_100 dataset. We will use these joinable keys for understanding + // incremental execution. + const TEST_JOIN_RATIO: f64 = 0.01; + // This test provides a relatively realistic end-to-end scenario where // we swap join sides to accommodate a FIFO source. - #[tokio::test] + #[tokio::test(flavor = "multi_thread", worker_threads = 8)] async fn unbounded_file_with_swapped_join() -> Result<()> { // Create session context let config = SessionConfig::new() .with_batch_size(TEST_BATCH_SIZE) .with_collect_statistics(false) .with_target_partitions(1); - let ctx = SessionContext::new_with_config(config); + // To make unbounded deterministic + let waiting = Arc::new(AtomicBool::new(true)); // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; - let fifo_path = create_fifo_file(&tmp_dir, "fifo_file.csv")?; + let fifo_path = create_fifo_file(&tmp_dir, "fifo_unbounded.csv")?; // Execution can calculated at least one RecordBatch after the number of // "joinable_lines_length" lines are read. let joinable_lines_length = @@ -106,7 +162,13 @@ mod unix_test { .map(|(a1, a2)| format!("{a1},{a2}\n")) .collect::>(); // Create writing threads for the left and right FIFO files - let task = create_writing_thread(fifo_path.clone(), lines); + let task = create_writing_thread( + fifo_path.clone(), + "a1,a2\n".to_owned(), + lines, + waiting.clone(), + joinable_lines_length * 2, + ); // Data Schema let schema = Arc::new(Schema::new(vec![ @@ -114,7 +176,6 @@ mod unix_test { Field::new("a2", DataType::UInt32, false), ])); - // Create a file with bounded or unbounded flag. let provider = fifo_table(schema, fifo_path, vec![]); ctx.register_table("left", provider).unwrap(); @@ -130,7 +191,9 @@ mod unix_test { // Execute the query let df = ctx.sql("SELECT t1.a2, t2.c1, t2.c4, t2.c5 FROM left as t1 JOIN right as t2 ON t1.a1 = t2.c1").await?; let mut stream = df.execute_stream().await?; - while (stream.next().await).is_some() {} + while (stream.next().await).is_some() { + waiting.store(false, Ordering::SeqCst); + } task.await.unwrap(); Ok(()) } @@ -142,16 +205,6 @@ mod unix_test { Equal, } - fn create_writing_thread(file_path: PathBuf, lines: Vec) -> JoinHandle<()> { - spawn_blocking(move || { - let mut file = OpenOptions::new().write(true).open(file_path).unwrap(); - for line in &lines { - file.write_all(line.as_bytes()).unwrap() - } - file.flush().unwrap(); - }) - } - // This test provides a relatively realistic end-to-end scenario where // we change the join into a [SymmetricHashJoin] to accommodate two // unbounded (FIFO) sources. @@ -163,6 +216,8 @@ mod unix_test { .set_bool("datafusion.execution.coalesce_batches", false) .with_target_partitions(1); let ctx = SessionContext::new_with_config(config); + // Tasks + let mut tasks: Vec> = vec![]; // Join filter let a1_iter = 0..TEST_DATA_SIZE; @@ -179,12 +234,24 @@ mod unix_test { let left_fifo = create_fifo_file(&tmp_dir, "left.csv")?; // Create a FIFO file for the right input source. let right_fifo = create_fifo_file(&tmp_dir, "right.csv")?; + // Create a mutex for tracking if the right input source is waiting for data. + let waiting = Arc::new(AtomicBool::new(true)); // Create writing threads for the left and right FIFO files - let tasks = vec![ - create_writing_thread(left_fifo.clone(), lines.clone()), - create_writing_thread(right_fifo.clone(), lines.clone()), - ]; + tasks.push(create_writing_thread( + left_fifo.clone(), + "a1,a2\n".to_owned(), + lines.clone(), + waiting.clone(), + TEST_BATCH_SIZE, + )); + tasks.push(create_writing_thread( + right_fifo.clone(), + "a1,a2\n".to_owned(), + lines.clone(), + waiting.clone(), + TEST_BATCH_SIZE, + )); // Create schema let schema = Arc::new(Schema::new(vec![ @@ -221,6 +288,7 @@ mod unix_test { let mut operations = vec![]; // Partial. while let Some(Ok(batch)) = stream.next().await { + waiting.store(false, Ordering::SeqCst); let left_unmatched = batch.column(2).null_count(); let right_unmatched = batch.column(0).null_count(); let op = if left_unmatched == 0 && right_unmatched == 0 { @@ -257,10 +325,12 @@ mod unix_test { /// It tests the INSERT INTO functionality. #[tokio::test] async fn test_sql_insert_into_fifo() -> Result<()> { + // To make unbounded deterministic + let waiting = Arc::new(AtomicBool::new(true)); + let waiting_thread = waiting.clone(); // create local execution context let config = SessionConfig::new().with_batch_size(TEST_BATCH_SIZE); let ctx = SessionContext::new_with_config(config); - // Create a new temporary FIFO file let tmp_dir = TempDir::new()?; let source_fifo_path = create_fifo_file(&tmp_dir, "source.csv")?; @@ -274,9 +344,12 @@ mod unix_test { // thread. This approach ensures that the pipeline remains unbroken. tasks.push(create_writing_thread( source_fifo_path_thread, + "a1,a2\n".to_owned(), (0..TEST_DATA_SIZE) .map(|_| "a,1\n".to_string()) .collect::>(), + waiting, + TEST_BATCH_SIZE, )); // Create a new temporary FIFO file let sink_fifo_path = create_fifo_file(&tmp_dir, "sink.csv")?; @@ -298,37 +371,30 @@ mod unix_test { .map_err(|e| DataFusionError::Internal(e.to_string())) .unwrap(); - let mut remaining = TEST_DATA_SIZE; - - while let Some(Ok(b)) = reader.next() { - remaining = remaining.checked_sub(b.num_rows()).unwrap(); - if remaining == 0 { - break; - } + while let Some(Ok(_)) = reader.next() { + waiting_thread.store(false, Ordering::SeqCst); } })); // register second csv file with the SQL (create an empty file if not found) ctx.sql(&format!( - "CREATE EXTERNAL TABLE source_table ( + "CREATE UNBOUNDED EXTERNAL TABLE source_table ( a1 VARCHAR NOT NULL, a2 INT NOT NULL ) STORED AS CSV WITH HEADER ROW - OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{source_display_fifo_path}'" )) .await?; // register csv file with the SQL ctx.sql(&format!( - "CREATE EXTERNAL TABLE sink_table ( + "CREATE UNBOUNDED EXTERNAL TABLE sink_table ( a1 VARCHAR NOT NULL, a2 INT NOT NULL ) STORED AS CSV WITH HEADER ROW - OPTIONS ('UNBOUNDED' 'TRUE') LOCATION '{sink_display_fifo_path}'" )) .await?; @@ -338,7 +404,7 @@ mod unix_test { .await?; // Start execution - let _ = df.collect().await.unwrap(); + df.collect().await?; futures::future::try_join_all(tasks).await.unwrap(); Ok(()) } From 917d06005e78ae5d4bd789504de04a985cbc0b71 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Wed, 15 Nov 2023 14:02:43 +0300 Subject: [PATCH 13/13] Resolving clippy --- datafusion/core/tests/fifo.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/core/tests/fifo.rs b/datafusion/core/tests/fifo.rs index 52355d8d3b8e..93c7f7368065 100644 --- a/datafusion/core/tests/fifo.rs +++ b/datafusion/core/tests/fifo.rs @@ -112,8 +112,7 @@ mod unix_test { while waiting_lock.load(Ordering::SeqCst) && cnt > wait_until { thread::sleep(Duration::from_millis(50)); } - write_to_fifo(&file, &line, execution_start, broken_pipe_timeout) - .unwrap(); + write_to_fifo(&file, line, execution_start, broken_pipe_timeout).unwrap(); } drop(file); })