From 24f5960f0dd500c190cd5c1e90c605a96351d934 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 4 Mar 2022 01:13:32 -0500 Subject: [PATCH 1/3] Add write_csv to DataFrame --- datafusion/src/dataframe.rs | 3 + datafusion/src/execution/context.rs | 35 +---- datafusion/src/execution/dataframe_impl.rs | 8 ++ .../src/physical_plan/file_format/csv.rs | 134 +++++++++++++++++- .../src/physical_plan/file_format/mod.rs | 2 +- 5 files changed, 146 insertions(+), 36 deletions(-) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index c8c5dcc1c5e6..dfbbc610abad 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -405,4 +405,7 @@ pub trait DataFrame: Send + Sync { /// # } /// ``` fn except(&self, dataframe: Arc) -> Result>; + + /// Write a `DataFrame` to a CSV file. + async fn write_csv(&self, path: &str) -> Result<()>; } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 3017660f13e7..d410bb55119c 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -49,7 +49,7 @@ use std::{fs, path::PathBuf}; use futures::{StreamExt, TryStreamExt}; use tokio::task::{self, JoinHandle}; -use arrow::{csv, datatypes::SchemaRef}; +use arrow::datatypes::SchemaRef; use crate::catalog::{ catalog::{CatalogProvider, MemoryCatalogProvider}, @@ -80,6 +80,7 @@ use crate::physical_optimizer::repartition::Repartition; use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv}; use crate::logical_plan::plan::Explain; +use crate::physical_plan::file_format::plan_to_csv; use crate::physical_plan::planner::DefaultPhysicalPlanner; use crate::physical_plan::udf::ScalarUDF; use crate::physical_plan::ExecutionPlan; @@ -717,37 +718,7 @@ impl ExecutionContext { plan: Arc, path: impl AsRef, ) -> Result<()> { - let path = path.as_ref(); - // create directory to contain the CSV files (one per partition) - let fs_path = Path::new(path); - let runtime = self.runtime_env(); - match fs::create_dir(fs_path) { - Ok(()) => { - let mut tasks = vec![]; - for i in 0..plan.output_partitioning().partition_count() { - let plan = plan.clone(); - let filename = format!("part-{}.csv", i); - let path = fs_path.join(&filename); - let file = fs::File::create(path)?; - let mut writer = csv::Writer::new(file); - let stream = plan.execute(i, runtime.clone()).await?; - let handle: JoinHandle> = task::spawn(async move { - stream - .map(|batch| writer.write(&batch?)) - .try_collect() - .await - .map_err(DataFusionError::from) - }); - tasks.push(handle); - } - futures::future::join_all(tasks).await; - Ok(()) - } - Err(e) => Err(DataFusionError::Execution(format!( - "Could not create directory {}: {:?}", - path, e - ))), - } + plan_to_csv(&self, plan, path).await } /// Executes a query and writes the results to a partitioned Parquet file. diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 0e3cc61f3b5a..ccce463b42df 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -39,6 +39,7 @@ use crate::{ use crate::arrow::util::pretty; use crate::datasource::TableProvider; use crate::datasource::TableType; +use crate::physical_plan::file_format::plan_to_csv; use crate::physical_plan::{ execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream, }; @@ -67,6 +68,13 @@ impl DataFrameImpl { let plan = ctx.optimize(&self.plan)?; ctx.create_physical_plan(&plan).await } + + async fn write_csv(&self, path: impl AsRef) -> Result<()> { + let plan = self.create_physical_plan().await?; + let state = self.ctx_state.lock().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + plan_to_csv(&ctx, plan, path).await + } } #[async_trait] diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 709705b5066d..0aab5fb7617b 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -18,18 +18,22 @@ //! Execution plan for reading CSV files use crate::error::{DataFusionError, Result}; +use crate::execution::context::ExecutionContext; use crate::physical_plan::expressions::PhysicalSortExpr; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; +use crate::execution::runtime_env::RuntimeEnv; use arrow::csv; use arrow::datatypes::SchemaRef; +use async_trait::async_trait; +use futures::{StreamExt, TryStreamExt}; use std::any::Any; +use std::fs; +use std::path::Path; use std::sync::Arc; - -use crate::execution::runtime_env::RuntimeEnv; -use async_trait::async_trait; +use tokio::task::{self, JoinHandle}; use super::file_stream::{BatchIter, FileStream}; use super::FileScanConfig; @@ -176,16 +180,57 @@ impl ExecutionPlan for CsvExec { } } +pub(crate) async fn plan_to_csv( + context: &ExecutionContext, + plan: Arc, + path: impl AsRef, +) -> Result<()> { + let path = path.as_ref(); + // create directory to contain the CSV files (one per partition) + let fs_path = Path::new(path); + let runtime = context.runtime_env(); + match fs::create_dir(fs_path) { + Ok(()) => { + let mut tasks = vec![]; + for i in 0..plan.output_partitioning().partition_count() { + let plan = plan.clone(); + let filename = format!("part-{}.csv", i); + let path = fs_path.join(&filename); + let file = fs::File::create(path)?; + let mut writer = csv::Writer::new(file); + let stream = plan.execute(i, runtime.clone()).await?; + let handle: JoinHandle> = task::spawn(async move { + stream + .map(|batch| writer.write(&batch?)) + .try_collect() + .await + .map_err(DataFusionError::from) + }); + tasks.push(handle); + } + futures::future::join_all(tasks).await; + Ok(()) + } + Err(e) => Err(DataFusionError::Execution(format!( + "Could not create directory {}: {:?}", + path, e + ))), + } +} + #[cfg(test)] mod tests { use super::*; + use crate::prelude::*; use crate::test_util::aggr_test_schema_with_missing_col; use crate::{ datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, scalar::ScalarValue, test_util::aggr_test_schema, }; + use arrow::datatypes::*; use futures::StreamExt; + use tempfile::TempDir; #[tokio::test] async fn csv_exec_with_projection() -> Result<()> { @@ -376,4 +421,87 @@ mod tests { crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]); Ok(()) } + + /// Generate CSV partitions within the supplied directory + fn populate_csv_partitions( + tmp_dir: &TempDir, + partition_count: usize, + file_extension: &str, + ) -> Result { + // define schema for data source (csv file) + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + Field::new("c3", DataType::Boolean, false), + ])); + + // generate a partitioned file + for partition in 0..partition_count { + let filename = format!("partition-{}.{}", partition, file_extension); + let file_path = tmp_dir.path().join(&filename); + let mut file = File::create(file_path)?; + + // generate some data + for i in 0..=10 { + let data = format!("{},{},{}\n", partition, i, i % 2 == 0); + file.write_all(data.as_bytes())?; + } + } + + Ok(schema) + } + + #[tokio::test] + async fn write_csv_results() -> Result<()> { + // create partitioned input file and context + let tmp_dir = TempDir::new()?; + let mut ctx = ExecutionContext::with_config( + ExecutionConfig::new().with_target_partitions(8), + ); + + let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?; + + // register csv file with the execution context + ctx.register_csv( + "test", + tmp_dir.path().to_str().unwrap(), + CsvReadOptions::new().schema(&schema), + ) + .await?; + + // execute a simple query and write the results to CSV + let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out"; + let df = ctx.sql("SELECT c1, c2 FROM test").await?; + df.write_csv(&out_dir).await?; + + // create a new context and verify that the results were saved to a partitioned csv file + let mut ctx = ExecutionContext::new(); + + let schema = Arc::new(Schema::new(vec![ + Field::new("c1", DataType::UInt32, false), + Field::new("c2", DataType::UInt64, false), + ])); + + // register each partition as well as the top level dir + let csv_read_option = CsvReadOptions::new().schema(&schema); + ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option) + .await?; + ctx.register_csv("allparts", &out_dir, csv_read_option) + .await?; + + let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?; + let allparts = ctx + .sql("SELECT c1, c2 FROM allparts") + .await? + .collect() + .await?; + + let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum(); + + assert_eq!(part0[0].schema(), allparts[0].schema()); + + assert_eq!(allparts_count, 40); + + Ok(()) + } } diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 7658addd3561..3053dfd3cfe3 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -32,7 +32,7 @@ use arrow::{ record_batch::RecordBatch, }; pub use avro::AvroExec; -pub use csv::CsvExec; +pub use csv::{plan_to_csv, CsvExec}; pub use json::NdJsonExec; use crate::error::DataFusionError; From 9fb5d7701f3965cda47cb18a92782587433243d7 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 4 Mar 2022 01:32:47 -0500 Subject: [PATCH 2/3] Cleanup --- datafusion/src/dataframe.rs | 2 +- datafusion/src/execution/dataframe_impl.rs | 14 +++++++------- datafusion/src/physical_plan/file_format/csv.rs | 4 +++- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index dfbbc610abad..cf4e93315bea 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -407,5 +407,5 @@ pub trait DataFrame: Send + Sync { fn except(&self, dataframe: Arc) -> Result>; /// Write a `DataFrame` to a CSV file. - async fn write_csv(&self, path: &str) -> Result<()>; + async fn write_csv(&self, path: impl AsRef) -> Result<()>; } diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index ccce463b42df..3583d2e6fa9b 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -68,13 +68,6 @@ impl DataFrameImpl { let plan = ctx.optimize(&self.plan)?; ctx.create_physical_plan(&plan).await } - - async fn write_csv(&self, path: impl AsRef) -> Result<()> { - let plan = self.create_physical_plan().await?; - let state = self.ctx_state.lock().clone(); - let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); - plan_to_csv(&ctx, plan, path).await - } } #[async_trait] @@ -321,6 +314,13 @@ impl DataFrame for DataFrameImpl { &LogicalPlanBuilder::except(left_plan, right_plan, true)?, ))) } + + async fn write_csv(&self, path: impl AsRef) -> Result<()> { + let plan = self.create_physical_plan().await?; + let state = self.ctx_state.lock().clone(); + let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); + plan_to_csv(&ctx, plan, path).await + } } #[cfg(test)] diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 0aab5fb7617b..8dcb40815757 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -180,7 +180,7 @@ impl ExecutionPlan for CsvExec { } } -pub(crate) async fn plan_to_csv( +pub async fn plan_to_csv( context: &ExecutionContext, plan: Arc, path: impl AsRef, @@ -230,6 +230,8 @@ mod tests { }; use arrow::datatypes::*; use futures::StreamExt; + use std::fs::File; + use std::io::Write; use tempfile::TempDir; #[tokio::test] From 067a177bfc29a711db65edc6b4248af75b323a56 Mon Sep 17 00:00:00 2001 From: Matthew Turner Date: Fri, 4 Mar 2022 22:04:06 -0500 Subject: [PATCH 3/3] Update write_csv signature --- datafusion/src/dataframe.rs | 2 +- datafusion/src/execution/context.rs | 2 +- datafusion/src/execution/dataframe_impl.rs | 2 +- datafusion/src/physical_plan/file_format/csv.rs | 2 +- datafusion/src/physical_plan/file_format/mod.rs | 3 ++- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs index cf4e93315bea..dfbbc610abad 100644 --- a/datafusion/src/dataframe.rs +++ b/datafusion/src/dataframe.rs @@ -407,5 +407,5 @@ pub trait DataFrame: Send + Sync { fn except(&self, dataframe: Arc) -> Result>; /// Write a `DataFrame` to a CSV file. - async fn write_csv(&self, path: impl AsRef) -> Result<()>; + async fn write_csv(&self, path: &str) -> Result<()>; } diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index d410bb55119c..a554a7c8bb71 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -718,7 +718,7 @@ impl ExecutionContext { plan: Arc, path: impl AsRef, ) -> Result<()> { - plan_to_csv(&self, plan, path).await + plan_to_csv(self, plan, path).await } /// Executes a query and writes the results to a partitioned Parquet file. diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 3583d2e6fa9b..c00729be0406 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -315,7 +315,7 @@ impl DataFrame for DataFrameImpl { ))) } - async fn write_csv(&self, path: impl AsRef) -> Result<()> { + async fn write_csv(&self, path: &str) -> Result<()> { let plan = self.create_physical_plan().await?; let state = self.ctx_state.lock().clone(); let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index 8dcb40815757..d9f4706fdf0b 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -502,7 +502,7 @@ mod tests { assert_eq!(part0[0].schema(), allparts[0].schema()); - assert_eq!(allparts_count, 40); + assert_eq!(allparts_count, 80); Ok(()) } diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 3053dfd3cfe3..baa58a01d164 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -32,7 +32,8 @@ use arrow::{ record_batch::RecordBatch, }; pub use avro::AvroExec; -pub use csv::{plan_to_csv, CsvExec}; +pub(crate) use csv::plan_to_csv; +pub use csv::CsvExec; pub use json::NdJsonExec; use crate::error::DataFusionError;