diff --git a/datafusion/core/src/datasource/listing/helpers.rs b/datafusion/core/src/datasource/listing/helpers.rs index 873d005b4baf..b4b93d5fc17e 100644 --- a/datafusion/core/src/datasource/listing/helpers.rs +++ b/datafusion/core/src/datasource/listing/helpers.rs @@ -162,7 +162,7 @@ pub fn split_files( pub async fn pruned_partition_list<'a>( store: &'a dyn ObjectStore, table_path: &'a ListingTableUrl, - filters: &[Expr], + filters: &'a [Expr], file_extension: &'a str, table_partition_cols: &'a [String], ) -> Result>> { diff --git a/datafusion/core/src/physical_plan/file_format/chunked_store.rs b/datafusion/core/src/physical_plan/file_format/chunked_store.rs new file mode 100644 index 000000000000..216926b06713 --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/chunked_store.rs @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use futures::StreamExt; +use object_store::path::Path; +use object_store::Result; +use object_store::{GetResult, ListResult, ObjectMeta, ObjectStore}; +use std::fmt::{Debug, Display, Formatter}; +use std::ops::Range; +use std::sync::Arc; + +/// Wraps a [`ObjectStore`] and makes its get response return chunks +/// +/// TODO: Upstream into object_store_rs +#[derive(Debug)] +pub struct ChunkedStore { + inner: Arc, + chunk_size: usize, +} + +impl ChunkedStore { + pub fn new(inner: Arc, chunk_size: usize) -> Self { + Self { inner, chunk_size } + } +} + +impl Display for ChunkedStore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "ChunkedStore({})", self.inner) + } +} + +#[async_trait] +impl ObjectStore for ChunkedStore { + async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> { + self.inner.put(location, bytes).await + } + + async fn get(&self, location: &Path) -> Result { + let bytes = self.inner.get(location).await?.bytes().await?; + let mut offset = 0; + let chunk_size = self.chunk_size; + + Ok(GetResult::Stream( + futures::stream::iter(std::iter::from_fn(move || { + let remaining = bytes.len() - offset; + if remaining == 0 { + return None; + } + let to_read = remaining.min(chunk_size); + let next_offset = offset + to_read; + let slice = bytes.slice(offset..next_offset); + offset = next_offset; + Some(Ok(slice)) + })) + .boxed(), + )) + } + + async fn get_range(&self, location: &Path, range: Range) -> Result { + self.inner.get_range(location, range).await + } + + async fn head(&self, location: &Path) -> Result { + self.inner.head(location).await + } + + async fn delete(&self, location: &Path) -> Result<()> { + self.inner.delete(location).await + } + + async fn list( + &self, + prefix: Option<&Path>, + ) -> Result>> { + self.inner.list(prefix).await + } + + async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { + self.inner.list_with_delimiter(prefix).await + } + + async fn copy(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy(from, to).await + } + + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.inner.copy_if_not_exists(from, to).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use object_store::memory::InMemory; + + #[tokio::test] + async fn test_chunked() { + let location = Path::parse("test").unwrap(); + let store = Arc::new(InMemory::new()); + store + .put(&location, Bytes::from(vec![0; 1001])) + .await + .unwrap(); + + for chunk_size in [10, 20, 31] { + let store = ChunkedStore::new(store.clone(), chunk_size); + let mut s = match store.get(&location).await.unwrap() { + GetResult::Stream(s) => s, + _ => unreachable!(), + }; + + let mut remaining = 1001; + while let Some(next) = s.next().await { + let size = next.unwrap().len(); + let expected = remaining.min(chunk_size); + assert_eq!(size, expected); + remaining -= expected; + } + assert_eq!(remaining, 0); + } + } +} diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 186da2ad6215..9ae634c0c193 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -25,6 +25,7 @@ use crate::physical_plan::{ }; use crate::datasource::listing::FileRange; +use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; use crate::physical_plan::file_format::file_stream::{ FileStream, FormatReader, ReaderFuture, }; @@ -167,12 +168,12 @@ struct CsvConfig { } impl CsvConfig { - fn open(&self, reader: R) -> csv::Reader { + fn open(&self, reader: R, first_chunk: bool) -> csv::Reader { let datetime_format = None; csv::Reader::new( reader, Arc::clone(&self.file_schema), - self.has_header, + self.has_header && first_chunk, Some(self.delimiter), self.batch_size, None, @@ -197,11 +198,18 @@ impl FormatReader for CsvOpener { Box::pin(async move { match store.get(&file.location).await? { GetResult::File(file, _) => { - Ok(futures::stream::iter(config.open(file)).boxed()) + Ok(futures::stream::iter(config.open(file, true)).boxed()) } - r @ GetResult::Stream(_) => { - let bytes = r.bytes().await?; - Ok(futures::stream::iter(config.open(bytes.reader())).boxed()) + GetResult::Stream(s) => { + let mut first_chunk = true; + Ok(newline_delimited_stream(s.map_err(Into::into)) + .map_ok(move |bytes| { + let reader = config.open(bytes.reader(), first_chunk); + first_chunk = false; + futures::stream::iter(reader) + }) + .try_flatten() + .boxed()) } } }) @@ -249,12 +257,14 @@ pub async fn plan_to_csv( #[cfg(test)] mod tests { use super::*; + use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::*; use crate::test::partitioned_csv_config; use crate::test_util::aggr_test_schema_with_missing_col; use crate::{scalar::ScalarValue, test_util::aggr_test_schema}; use arrow::datatypes::*; use futures::StreamExt; + use object_store::local::LocalFileSystem; use std::fs::File; use std::io::Write; use tempfile::TempDir; @@ -441,6 +451,38 @@ mod tests { Ok(schema) } + #[tokio::test] + async fn test_chunked() { + let ctx = SessionContext::new(); + let chunk_sizes = [10, 20, 30, 40]; + + for chunk_size in chunk_sizes { + ctx.runtime_env().register_object_store( + "file", + "", + Arc::new(ChunkedStore::new( + Arc::new(LocalFileSystem::new()), + chunk_size, + )), + ); + + let task_ctx = ctx.task_ctx(); + + let filename = "aggregate_test_100.csv"; + let file_schema = aggr_test_schema(); + let config = + partitioned_csv_config(filename, file_schema.clone(), 1).unwrap(); + let csv = CsvExec::new(config, true, b','); + + let it = csv.execute(0, task_ctx).unwrap(); + let batches: Vec<_> = it.try_collect().await.unwrap(); + + let total_rows = batches.iter().map(|b| b.num_rows()).sum::(); + + assert_eq!(total_rows, 100); + } + } + #[tokio::test] async fn write_csv_results() -> Result<()> { // create partitioned input file and context diff --git a/datafusion/core/src/physical_plan/file_format/delimited_stream.rs b/datafusion/core/src/physical_plan/file_format/delimited_stream.rs new file mode 100644 index 000000000000..1b6e30a9464e --- /dev/null +++ b/datafusion/core/src/physical_plan/file_format/delimited_stream.rs @@ -0,0 +1,222 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::{DataFusionError, Result}; +use bytes::Bytes; +use futures::{Stream, StreamExt}; +use std::collections::VecDeque; + +/// The ASCII encoding of `"` +const QUOTE: u8 = b'"'; + +/// The ASCII encoding of `\n` +const NEWLINE: u8 = b'\n'; + +/// The ASCII encoding of `\` +const ESCAPE: u8 = b'\\'; + +/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an iterator +/// of [`Bytes`] containing a whole number of new line delimited records +#[derive(Debug, Default)] +struct LineDelimiter { + /// Complete chunks of [`Bytes`] + complete: VecDeque, + /// Remainder bytes that form the next record + remainder: Vec, + /// True if the last character was the escape character + is_escape: bool, + /// True if currently processing a quoted string + is_quote: bool, +} + +impl LineDelimiter { + /// Creates a new [`LineDelimiter`] with the provided delimiter + fn new() -> Self { + Self::default() + } + + /// Adds the next set of [`Bytes`] + fn push(&mut self, val: impl Into) { + let val: Bytes = val.into(); + + let is_escape = &mut self.is_escape; + let is_quote = &mut self.is_quote; + let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| { + if *is_escape { + *is_escape = false; + None + } else if *v == ESCAPE { + *is_escape = true; + None + } else if *v == QUOTE { + *is_quote = !*is_quote; + None + } else if *is_quote { + None + } else { + (*v == NEWLINE).then(|| idx + 1) + } + }); + + let start_offset = match self.remainder.is_empty() { + true => 0, + false => match record_ends.next() { + Some(idx) => { + self.remainder.extend_from_slice(&val[0..idx]); + self.complete + .push_back(Bytes::from(std::mem::take(&mut self.remainder))); + idx + } + None => { + self.remainder.extend_from_slice(&val); + return; + } + }, + }; + let end_offset = record_ends.last().unwrap_or(start_offset); + if start_offset != end_offset { + self.complete.push_back(val.slice(start_offset..end_offset)); + } + + if end_offset != val.len() { + self.remainder.extend_from_slice(&val[end_offset..]) + } + } + + /// Marks the end of the stream, delimiting any remaining bytes + /// + /// Returns `true` if there is no remaining data to be read + fn finish(&mut self) -> Result { + if !self.remainder.is_empty() { + if self.is_quote { + return Err(DataFusionError::Execution( + "encountered unterminated string".to_string(), + )); + } + + if self.is_escape { + return Err(DataFusionError::Execution( + "encountered trailing escape character".to_string(), + )); + } + + self.complete + .push_back(Bytes::from(std::mem::take(&mut self.remainder))) + } + Ok(self.complete.is_empty()) + } +} + +impl Iterator for LineDelimiter { + type Item = Bytes; + + fn next(&mut self) -> Option { + self.complete.pop_front() + } +} + +/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each +/// yielded [`Bytes`] contains a whole number of new line delimited records +/// accounting for `\` style escapes and `"` quotes +pub fn newline_delimited_stream(s: S) -> impl Stream> +where + S: Stream> + Unpin, +{ + let delimiter = LineDelimiter::new(); + + futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move { + loop { + if let Some(next) = delimiter.next() { + return Some((Ok(next), (s, delimiter))); + } + + match s.next().await { + Some(Ok(bytes)) => delimiter.push(bytes), + Some(Err(e)) => return Some((Err(e), (s, delimiter))), + None => match delimiter.finish() { + Ok(true) => return None, + Ok(false) => continue, + Err(e) => return Some((Err(e), (s, delimiter))), + }, + } + } + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::stream::TryStreamExt; + + #[test] + fn test_delimiter() { + let mut delimiter = LineDelimiter::new(); + delimiter.push("hello\nworld"); + delimiter.push("\n\n"); + + assert_eq!(delimiter.next().unwrap(), Bytes::from("hello\n")); + assert_eq!(delimiter.next().unwrap(), Bytes::from("world\n")); + assert_eq!(delimiter.next().unwrap(), Bytes::from("\n")); + assert!(delimiter.next().is_none()); + } + + #[test] + fn test_delimiter_escaped() { + let mut delimiter = LineDelimiter::new(); + delimiter.push(""); + delimiter.push("fo\\\n\"foo"); + delimiter.push("bo\n\"bar\n"); + delimiter.push("\"he"); + delimiter.push("llo\"\n"); + assert_eq!( + delimiter.next().unwrap(), + Bytes::from("fo\\\n\"foobo\n\"bar\n") + ); + assert_eq!(delimiter.next().unwrap(), Bytes::from("\"hello\"\n")); + assert!(delimiter.next().is_none()); + + // Verify can push further data + delimiter.push("\"foo\nbar\",\"fiz\\\"inner\\\"\"\nhello"); + assert!(!delimiter.finish().unwrap()); + + assert_eq!( + delimiter.next().unwrap(), + Bytes::from("\"foo\nbar\",\"fiz\\\"inner\\\"\"\n") + ); + assert_eq!(delimiter.next().unwrap(), Bytes::from("hello")); + assert!(delimiter.finish().unwrap()); + assert!(delimiter.next().is_none()); + } + + #[tokio::test] + async fn test_delimiter_stream() { + let input = vec!["hello\nworld\nbin", "go\ncup", "cakes"]; + let input_stream = + futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s)))); + let stream = newline_delimited_stream(input_stream); + + let results: Vec<_> = stream.try_collect().await.unwrap(); + assert_eq!( + results, + vec![ + Bytes::from("hello\nworld\n"), + Bytes::from("bingo\n"), + Bytes::from("cupcakes") + ] + ) + } +} diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 385ac427c7a9..bdea9515ee11 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -23,6 +23,7 @@ use crate::error::{DataFusionError, Result}; use crate::execution::context::SessionState; use crate::execution::context::TaskContext; use crate::physical_plan::expressions::PhysicalSortExpr; +use crate::physical_plan::file_format::delimited_stream::newline_delimited_stream; use crate::physical_plan::file_format::file_stream::{ FileStream, FormatReader, ReaderFuture, }; @@ -163,12 +164,18 @@ impl FormatReader for JsonOpener { let reader = json::Reader::new(file, schema.clone(), options); Ok(futures::stream::iter(reader).boxed()) } - r @ GetResult::Stream(_) => { - let bytes = r.bytes().await?; - let reader = - json::Reader::new(bytes.reader(), schema.clone(), options); - - Ok(futures::stream::iter(reader).boxed()) + GetResult::Stream(s) => { + Ok(newline_delimited_stream(s.map_err(Into::into)) + .map_ok(move |bytes| { + let reader = json::Reader::new( + bytes.reader(), + schema.clone(), + options.clone(), + ); + futures::stream::iter(reader) + }) + .try_flatten() + .boxed()) } } }) @@ -218,10 +225,13 @@ mod tests { use arrow::array::Array; use arrow::datatypes::{Field, Schema}; use futures::StreamExt; + use object_store::local::LocalFileSystem; + use crate::assert_batches_eq; use crate::datasource::file_format::{json::JsonFormat, FileFormat}; use crate::datasource::listing::PartitionedFile; use crate::datasource::object_store::ObjectStoreUrl; + use crate::physical_plan::file_format::chunked_store::ChunkedStore; use crate::prelude::NdJsonReadOptions; use crate::prelude::*; use crate::test::object_store::local_unpartitioned_file; @@ -433,4 +443,38 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_chunked() { + let mut ctx = SessionContext::new(); + + for chunk_size in [10, 20, 30, 40] { + ctx.runtime_env().register_object_store( + "file", + "", + Arc::new(ChunkedStore::new( + Arc::new(LocalFileSystem::new()), + chunk_size, + )), + ); + + let path = format!("{}/1.json", TEST_DATA_BASE); + let frame = ctx.read_json(path, Default::default()).await.unwrap(); + let results = frame.collect().await.unwrap(); + + assert_batches_eq!( + &[ + "+-----+----------------+---------------+------+", + "| a | b | c | d |", + "+-----+----------------+---------------+------+", + "| 1 | [2, 1.3, -6.1] | [false, true] | 4 |", + "| -10 | [2, 1.3, -6.1] | [true, true] | 4 |", + "| 2 | [2, , -6.1] | [false, ] | text |", + "| | | | |", + "+-----+----------------+---------------+------+", + ], + &results + ); + } + } } diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index e59e8248f8ea..3ea520b2cc94 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -18,7 +18,10 @@ //! Execution plans that read file formats mod avro; +#[cfg(test)] +mod chunked_store; mod csv; +mod delimited_stream; mod file_stream; mod json; mod parquet;