diff --git a/datafusion-examples/Cargo.toml b/datafusion-examples/Cargo.toml index f53316d00bd9..b7bf1161f868 100644 --- a/datafusion-examples/Cargo.toml +++ b/datafusion-examples/Cargo.toml @@ -39,6 +39,7 @@ arrow = { workspace = true } arrow-flight = { workspace = true } arrow-schema = { workspace = true } async-trait = "0.1.41" +bytes = "1.4" dashmap = "5.4" datafusion = { path = "../datafusion/core" } datafusion-common = { path = "../datafusion/common" } diff --git a/datafusion-examples/examples/csv_opener.rs b/datafusion-examples/examples/csv_opener.rs new file mode 100644 index 000000000000..0143b0297e22 --- /dev/null +++ b/datafusion-examples/examples/csv_opener.rs @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{sync::Arc, vec}; + +use datafusion::{ + assert_batches_eq, + datasource::{ + file_format::file_type::FileCompressionType, listing::PartitionedFile, + object_store::ObjectStoreUrl, + }, + error::Result, + physical_plan::{ + file_format::{CsvConfig, CsvOpener, FileScanConfig, FileStream}, + metrics::ExecutionPlanMetricsSet, + }, + test_util::aggr_test_schema, +}; +use futures::StreamExt; +use object_store::local::LocalFileSystem; + +/// This example demonstrates a scanning against an Arrow data source (CSV) and +/// fetching results +#[tokio::main] +async fn main() -> Result<()> { + let object_store = Arc::new(LocalFileSystem::new()); + let schema = aggr_test_schema(); + + let config = CsvConfig::new( + 8192, + schema.clone(), + Some(vec![12, 0]), + true, + b',', + object_store, + ); + + let opener = CsvOpener::new(Arc::new(config), FileCompressionType::UNCOMPRESSED); + + let testdata = datafusion::test_util::arrow_test_data(); + let path = format!("{testdata}/csv/aggregate_test_100.csv"); + + let path = std::path::Path::new(&path).canonicalize()?; + + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new(path.display().to_string(), 10)]], + statistics: Default::default(), + projection: Some(vec![12, 0]), + limit: Some(5), + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + + let result = + FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) + .unwrap() + .map(|b| b.unwrap()) + .collect::>() + .await; + assert_batches_eq!( + &[ + "+--------------------------------+----+", + "| c13 | c1 |", + "+--------------------------------+----+", + "| 6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW | c |", + "| C2GT5KVyOPZpgKVl110TyZO0NcJ434 | d |", + "| AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz | b |", + "| 0keZ5G8BffGwgF2RwQD59TFzMStxCB | a |", + "| Ig1QcuKsjHXkproePdERo2w0mYzIqd | b |", + "+--------------------------------+----+", + ], + &result + ); + Ok(()) +} diff --git a/datafusion-examples/examples/json_opener.rs b/datafusion-examples/examples/json_opener.rs new file mode 100644 index 000000000000..843bed4f61ee --- /dev/null +++ b/datafusion-examples/examples/json_opener.rs @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{sync::Arc, vec}; + +use arrow_schema::{DataType, Field, Schema}; +use datafusion::{ + assert_batches_eq, + datasource::{ + file_format::file_type::FileCompressionType, listing::PartitionedFile, + object_store::ObjectStoreUrl, + }, + error::Result, + physical_plan::{ + file_format::{FileScanConfig, FileStream, JsonOpener}, + metrics::ExecutionPlanMetricsSet, + }, +}; +use futures::StreamExt; +use object_store::ObjectStore; + +/// This example demonstrates a scanning against an Arrow data source (JSON) and +/// fetching results +#[tokio::main] +async fn main() -> Result<()> { + let object_store = object_store::memory::InMemory::new(); + let path = object_store::path::Path::from("demo.json"); + let data = bytes::Bytes::from( + r#"{"num":5,"str":"test"} + {"num":2,"str":"hello"} + {"num":4,"str":"foo"}"#, + ); + object_store.put(&path, data).await.unwrap(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("num", DataType::Int64, false), + Field::new("str", DataType::Utf8, false), + ])); + + let projected = Arc::new(schema.clone().project(&[1, 0])?); + + let opener = JsonOpener::new( + 8192, + projected, + FileCompressionType::UNCOMPRESSED, + Arc::new(object_store), + ); + + let scan_config = FileScanConfig { + object_store_url: ObjectStoreUrl::local_filesystem(), + file_schema: schema.clone(), + file_groups: vec![vec![PartitionedFile::new(path.to_string(), 10)]], + statistics: Default::default(), + projection: Some(vec![1, 0]), + limit: Some(5), + table_partition_cols: vec![], + output_ordering: None, + infinite_source: false, + }; + + let result = + FileStream::new(&scan_config, 0, opener, &ExecutionPlanMetricsSet::new()) + .unwrap() + .map(|b| b.unwrap()) + .collect::>() + .await; + assert_batches_eq!( + &[ + "+-------+-----+", + "| str | num |", + "+-------+-----+", + "| test | 5 |", + "| hello | 2 |", + "| foo | 4 |", + "+-------+-----+", + ], + &result + ); + Ok(()) +} diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 9826d32d0f7a..8c8d89e38fd4 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -188,8 +188,9 @@ impl ExecutionPlan for CsvExec { } } +/// A Config for [`CsvOpener`] #[derive(Debug, Clone)] -struct CsvConfig { +pub struct CsvConfig { batch_size: usize, file_schema: SchemaRef, file_projection: Option>, @@ -198,6 +199,27 @@ struct CsvConfig { object_store: Arc, } +impl CsvConfig { + /// Returns a [`CsvConfig`] + pub fn new( + batch_size: usize, + file_schema: SchemaRef, + file_projection: Option>, + has_header: bool, + delimiter: u8, + object_store: Arc, + ) -> Self { + Self { + batch_size, + file_schema, + file_projection, + has_header, + delimiter, + object_store, + } + } +} + impl CsvConfig { fn open(&self, reader: R) -> csv::Reader { let datetime_format = None; @@ -228,11 +250,25 @@ impl CsvConfig { } } -struct CsvOpener { +/// A [`FileOpener`] that opens a CSV file and yields a [`FileOpenFuture`] +pub struct CsvOpener { config: Arc, file_compression_type: FileCompressionType, } +impl CsvOpener { + /// Returns a [`CsvOpener`] + pub fn new( + config: Arc, + file_compression_type: FileCompressionType, + ) -> Self { + Self { + config, + file_compression_type, + } + } +} + impl FileOpener for CsvOpener { fn open(&self, file_meta: FileMeta) -> Result { let config = self.config.clone(); diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index fb34175b85c9..95d978566662 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -162,13 +162,31 @@ impl ExecutionPlan for NdJsonExec { } } -struct JsonOpener { +/// A [`FileOpener`] that opens a JSON file and yields a [`FileOpenFuture`] +pub struct JsonOpener { batch_size: usize, projected_schema: SchemaRef, file_compression_type: FileCompressionType, object_store: Arc, } +impl JsonOpener { + /// Returns a [`JsonOpener`] + pub fn new( + batch_size: usize, + projected_schema: SchemaRef, + file_compression_type: FileCompressionType, + object_store: Arc, + ) -> Self { + Self { + batch_size, + projected_schema, + file_compression_type, + object_store, + } + } +} + impl FileOpener for JsonOpener { fn open(&self, file_meta: FileMeta) -> Result { let store = self.object_store.clone(); diff --git a/datafusion/core/src/physical_plan/file_format/mod.rs b/datafusion/core/src/physical_plan/file_format/mod.rs index 141a737314dc..a50f3bf02543 100644 --- a/datafusion/core/src/physical_plan/file_format/mod.rs +++ b/datafusion/core/src/physical_plan/file_format/mod.rs @@ -26,7 +26,7 @@ mod json; mod parquet; pub(crate) use self::csv::plan_to_csv; -pub use self::csv::CsvExec; +pub use self::csv::{CsvConfig, CsvExec, CsvOpener}; pub(crate) use self::parquet::plan_to_parquet; pub use self::parquet::{ParquetExec, ParquetFileMetrics, ParquetFileReaderFactory}; use arrow::{ @@ -39,7 +39,7 @@ pub use avro::AvroExec; use datafusion_physical_expr::PhysicalSortExpr; pub use file_stream::{FileOpenFuture, FileOpener, FileStream}; pub(crate) use json::plan_to_json; -pub use json::NdJsonExec; +pub use json::{JsonOpener, NdJsonExec}; use crate::datasource::{ listing::{FileRange, PartitionedFile},