Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions datafusion/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,4 +405,7 @@ pub trait DataFrame: Send + Sync {
/// # }
/// ```
fn except(&self, dataframe: Arc<dyn DataFrame>) -> Result<Arc<dyn DataFrame>>;

/// Write a `DataFrame` to a CSV file.
async fn write_csv(&self, path: &str) -> Result<()>;
}
35 changes: 3 additions & 32 deletions datafusion/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use std::{fs, path::PathBuf};
use futures::{StreamExt, TryStreamExt};
use tokio::task::{self, JoinHandle};

use arrow::{csv, datatypes::SchemaRef};
use arrow::datatypes::SchemaRef;

use crate::catalog::{
catalog::{CatalogProvider, MemoryCatalogProvider},
Expand Down Expand Up @@ -80,6 +80,7 @@ use crate::physical_optimizer::repartition::Repartition;

use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
Expand Down Expand Up @@ -717,37 +718,7 @@ impl ExecutionContext {
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
let runtime = self.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.csv", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let mut writer = csv::Writer::new(file);
let stream = plan.execute(i, runtime.clone()).await?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(DataFusionError::from)
});
tasks.push(handle);
}
futures::future::join_all(tasks).await;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
"Could not create directory {}: {:?}",
path, e
))),
}
plan_to_csv(self, plan, path).await
}

/// Executes a query and writes the results to a partitioned Parquet file.
Expand Down
8 changes: 8 additions & 0 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::{
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
use crate::physical_plan::file_format::plan_to_csv;
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan, SendableRecordBatchStream,
};
Expand Down Expand Up @@ -313,6 +314,13 @@ impl DataFrame for DataFrameImpl {
&LogicalPlanBuilder::except(left_plan, right_plan, true)?,
)))
}

async fn write_csv(&self, path: &str) -> Result<()> {
let plan = self.create_physical_plan().await?;
let state = self.ctx_state.lock().clone();
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_csv(&ctx, plan, path).await
}
}

#[cfg(test)]
Expand Down
136 changes: 133 additions & 3 deletions datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,22 @@
//! Execution plan for reading CSV files

use crate::error::{DataFusionError, Result};
use crate::execution::context::ExecutionContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::physical_plan::{
DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics,
};

use crate::execution::runtime_env::RuntimeEnv;
use arrow::csv;
use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
use std::any::Any;
use std::fs;
use std::path::Path;
use std::sync::Arc;

use crate::execution::runtime_env::RuntimeEnv;
use async_trait::async_trait;
use tokio::task::{self, JoinHandle};

use super::file_stream::{BatchIter, FileStream};
use super::FileScanConfig;
Expand Down Expand Up @@ -176,16 +180,59 @@ impl ExecutionPlan for CsvExec {
}
}

pub async fn plan_to_csv(
context: &ExecutionContext,
plan: Arc<dyn ExecutionPlan>,
path: impl AsRef<str>,
) -> Result<()> {
let path = path.as_ref();
// create directory to contain the CSV files (one per partition)
let fs_path = Path::new(path);
let runtime = context.runtime_env();
match fs::create_dir(fs_path) {
Ok(()) => {
let mut tasks = vec![];
for i in 0..plan.output_partitioning().partition_count() {
let plan = plan.clone();
let filename = format!("part-{}.csv", i);
let path = fs_path.join(&filename);
let file = fs::File::create(path)?;
let mut writer = csv::Writer::new(file);
let stream = plan.execute(i, runtime.clone()).await?;
let handle: JoinHandle<Result<()>> = task::spawn(async move {
stream
.map(|batch| writer.write(&batch?))
.try_collect()
.await
.map_err(DataFusionError::from)
});
tasks.push(handle);
}
futures::future::join_all(tasks).await;
Ok(())
}
Err(e) => Err(DataFusionError::Execution(format!(
"Could not create directory {}: {:?}",
path, e
))),
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::prelude::*;
use crate::test_util::aggr_test_schema_with_missing_col;
use crate::{
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
test_util::aggr_test_schema,
};
use arrow::datatypes::*;
use futures::StreamExt;
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;

#[tokio::test]
async fn csv_exec_with_projection() -> Result<()> {
Expand Down Expand Up @@ -376,4 +423,87 @@ mod tests {
crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]);
Ok(())
}

/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
partition_count: usize,
file_extension: &str,
) -> Result<SchemaRef> {
// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
Field::new("c3", DataType::Boolean, false),
]));

// generate a partitioned file
for partition in 0..partition_count {
let filename = format!("partition-{}.{}", partition, file_extension);
let file_path = tmp_dir.path().join(&filename);
let mut file = File::create(file_path)?;

// generate some data
for i in 0..=10 {
let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
file.write_all(data.as_bytes())?;
}
}

Ok(schema)
}

#[tokio::test]
async fn write_csv_results() -> Result<()> {
// create partitioned input file and context
let tmp_dir = TempDir::new()?;
let mut ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_target_partitions(8),
);

let schema = populate_csv_partitions(&tmp_dir, 8, ".csv")?;

// register csv file with the execution context
ctx.register_csv(
"test",
tmp_dir.path().to_str().unwrap(),
CsvReadOptions::new().schema(&schema),
)
.await?;

// execute a simple query and write the results to CSV
let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
let df = ctx.sql("SELECT c1, c2 FROM test").await?;
df.write_csv(&out_dir).await?;

// create a new context and verify that the results were saved to a partitioned csv file
let mut ctx = ExecutionContext::new();

let schema = Arc::new(Schema::new(vec![
Field::new("c1", DataType::UInt32, false),
Field::new("c2", DataType::UInt64, false),
]));

// register each partition as well as the top level dir
let csv_read_option = CsvReadOptions::new().schema(&schema);
ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir), csv_read_option)
.await?;
ctx.register_csv("allparts", &out_dir, csv_read_option)
.await?;

let part0 = ctx.sql("SELECT c1, c2 FROM part0").await?.collect().await?;
let allparts = ctx
.sql("SELECT c1, c2 FROM allparts")
.await?
.collect()
.await?;

let allparts_count: usize = allparts.iter().map(|batch| batch.num_rows()).sum();

assert_eq!(part0[0].schema(), allparts[0].schema());

assert_eq!(allparts_count, 80);

Ok(())
}
}
1 change: 1 addition & 0 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use arrow::{
record_batch::RecordBatch,
};
pub use avro::AvroExec;
pub(crate) use csv::plan_to_csv;
pub use csv::CsvExec;
pub use json::NdJsonExec;

Expand Down