From 7f31fabb29b84b2c9be8ae13f30b7218cb99bbfb Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 14:28:10 +0800 Subject: [PATCH 01/13] add csv mode to datafusion cli --- datafusion-cli/Cargo.toml | 2 + datafusion-cli/src/format.rs | 1 + datafusion-cli/src/format/print_format.rs | 57 +++++++++++++++++++++++ datafusion-cli/src/main.rs | 40 ++++++++++++---- 4 files changed, 90 insertions(+), 10 deletions(-) create mode 100644 datafusion-cli/src/format.rs create mode 100644 datafusion-cli/src/format/print_format.rs diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index 883d0f2f4c66..be1bf0f478bd 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -31,3 +31,5 @@ clap = "2.33" rustyline = "8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion" } +arrow = { git = "https://github.com/apache/arrow-rs", rev = "508f25c10032857da34ea88cc8166f0741616a32" } +tempfile = "3.2.0" diff --git a/datafusion-cli/src/format.rs b/datafusion-cli/src/format.rs new file mode 100644 index 000000000000..60073a5aad5e --- /dev/null +++ b/datafusion-cli/src/format.rs @@ -0,0 +1 @@ +pub mod print_format; diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs new file mode 100644 index 000000000000..492435275b67 --- /dev/null +++ b/datafusion-cli/src/format/print_format.rs @@ -0,0 +1,57 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Print format variants +use arrow::csv::writer::WriterBuilder; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion::arrow::util::pretty; +use datafusion::error::Result; +use std::io::{Read, Seek, SeekFrom}; +use tempfile::tempfile; + +/// Allow records to be printed in different formats +#[derive(Debug, Clone)] +pub enum PrintFormat { + Csv, + Aligned, +} + +impl PrintFormat { + /// print the batches to stdout using the specified format + pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { + match self { + PrintFormat::Csv => { + // utilizing a temp file so that we can leverage the arrow csv writer + // ideally the upstream shall take a more generic trait + let mut file = tempfile()?; + { + let builder = WriterBuilder::new().has_headers(true); + let mut writer = builder.build(&file); + batches + .iter() + .for_each(|batch| writer.write(batch).unwrap()); + } + let mut data = String::new(); + file.seek(SeekFrom::Start(0))?; + file.read_to_string(&mut data)?; + println!("{}", data); + } + PrintFormat::Aligned => pretty::print_batches(batches)?, + } + Ok(()) + } +} diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 2e8fe111b2fc..1971a3c0fd19 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -16,10 +16,13 @@ // under the License. #![allow(bare_trait_objects)] + +mod format; + use clap::{crate_version, App, Arg}; -use datafusion::arrow::util::pretty; use datafusion::error::Result; use datafusion::execution::context::{ExecutionConfig, ExecutionContext}; +use format::print_format::PrintFormat; use rustyline::Editor; use std::env; use std::fs::File; @@ -55,12 +58,18 @@ pub async fn main() { ) .arg( Arg::with_name("file") - .help("execute commands from file, then exit") + .help("Execute commands from file, then exit") .short("f") .long("file") .validator(is_valid_file) .takes_value(true), ) + .arg( + Arg::with_name("csv") + .help("Switches to CSV (Comma-Separated Values) output mode.") + .long("csv") + .takes_value(false), + ) .get_matches(); if let Some(path) = matches.value_of("data-path") { @@ -77,19 +86,26 @@ pub async fn main() { execution_config = execution_config.with_batch_size(batch_size); }; + let print_format = if matches.is_present("csv") { + PrintFormat::Csv + } else { + PrintFormat::Aligned + }; + if let Some(file_path) = matches.value_of("file") { let file = File::open(file_path) .unwrap_or_else(|err| panic!("cannot open file '{}': {}", file_path, err)); let mut reader = BufReader::new(file); - exec_from_lines(&mut reader, execution_config).await; + exec_from_lines(&mut reader, execution_config, print_format).await; } else { - exec_from_repl(execution_config).await; + exec_from_repl(execution_config, print_format).await; } } async fn exec_from_lines( reader: &mut BufReader, execution_config: ExecutionConfig, + print_format: PrintFormat, ) { let mut ctx = ExecutionContext::with_config(execution_config); let mut query = "".to_owned(); @@ -100,7 +116,7 @@ async fn exec_from_lines( let line = line.trim_end(); query.push_str(line); if line.ends_with(';') { - match exec_and_print(&mut ctx, query).await { + match exec_and_print(&mut ctx, print_format.clone(), query).await { Ok(_) => {} Err(err) => println!("{:?}", err), } @@ -117,14 +133,14 @@ async fn exec_from_lines( // run the left over query if the last statement doesn't contain ‘;’ if !query.is_empty() { - match exec_and_print(&mut ctx, query).await { + match exec_and_print(&mut ctx, print_format, query).await { Ok(_) => {} Err(err) => println!("{:?}", err), } } } -async fn exec_from_repl(execution_config: ExecutionConfig) { +async fn exec_from_repl(execution_config: ExecutionConfig, print_format: PrintFormat) { let mut ctx = ExecutionContext::with_config(execution_config); let mut rl = Editor::<()>::new(); @@ -139,7 +155,7 @@ async fn exec_from_repl(execution_config: ExecutionConfig) { Ok(ref line) if line.trim_end().ends_with(';') => { query.push_str(line.trim_end()); rl.add_history_entry(query.clone()); - match exec_and_print(&mut ctx, query).await { + match exec_and_print(&mut ctx, print_format.clone(), query).await { Ok(_) => {} Err(err) => println!("{:?}", err), } @@ -186,7 +202,11 @@ fn is_exit_command(line: &str) -> bool { line == "quit" || line == "exit" } -async fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> { +async fn exec_and_print( + ctx: &mut ExecutionContext, + print_format: PrintFormat, + sql: String, +) -> Result<()> { let now = Instant::now(); let df = ctx.sql(&sql)?; @@ -200,7 +220,7 @@ async fn exec_and_print(ctx: &mut ExecutionContext, sql: String) -> Result<()> { return Ok(()); } - pretty::print_batches(&results)?; + print_format.print_batches(&results)?; let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); From 8390d158036f29eded66a5f14ea111521203b69c Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 14:30:58 +0800 Subject: [PATCH 02/13] add license --- datafusion-cli/src/format.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion-cli/src/format.rs b/datafusion-cli/src/format.rs index 60073a5aad5e..c5da78f17951 100644 --- a/datafusion-cli/src/format.rs +++ b/datafusion-cli/src/format.rs @@ -1 +1,17 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. pub mod print_format; From c436afd1a4d01e1a1bc03b3e726fdc28f51a2c58 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:16:10 +0800 Subject: [PATCH 03/13] fix per comments --- datafusion-cli/Cargo.toml | 1 - datafusion-cli/src/format/print_format.rs | 39 +++++++++++++---------- datafusion-cli/src/main.rs | 28 ++++++++++------ 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/datafusion-cli/Cargo.toml b/datafusion-cli/Cargo.toml index be1bf0f478bd..2cde4da16ca1 100644 --- a/datafusion-cli/Cargo.toml +++ b/datafusion-cli/Cargo.toml @@ -32,4 +32,3 @@ rustyline = "8.0" tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync"] } datafusion = { path = "../datafusion" } arrow = { git = "https://github.com/apache/arrow-rs", rev = "508f25c10032857da34ea88cc8166f0741616a32" } -tempfile = "3.2.0" diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index 492435275b67..921e29f7fae7 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -19,15 +19,25 @@ use arrow::csv::writer::WriterBuilder; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; -use datafusion::error::Result; -use std::io::{Read, Seek, SeekFrom}; -use tempfile::tempfile; +use datafusion::error::{DataFusionError, Result}; +use std::str::FromStr; /// Allow records to be printed in different formats #[derive(Debug, Clone)] pub enum PrintFormat { Csv, - Aligned, + Table, +} + +impl FromStr for PrintFormat { + type Err = (); + fn from_str(s: &str) -> std::result::Result { + match s { + "csv" => Ok(PrintFormat::Csv), + "table" => Ok(PrintFormat::Table), + _ => Err(()), + } + } } impl PrintFormat { @@ -35,22 +45,19 @@ impl PrintFormat { pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { match self { PrintFormat::Csv => { - // utilizing a temp file so that we can leverage the arrow csv writer - // ideally the upstream shall take a more generic trait - let mut file = tempfile()?; + let mut bytes = vec![]; { let builder = WriterBuilder::new().has_headers(true); - let mut writer = builder.build(&file); - batches - .iter() - .for_each(|batch| writer.write(batch).unwrap()); + let mut writer = builder.build(&mut bytes); + for batch in batches { + writer.write(batch)?; + } } - let mut data = String::new(); - file.seek(SeekFrom::Start(0))?; - file.read_to_string(&mut data)?; - println!("{}", data); + let csv = String::from_utf8(bytes) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + println!("{}", csv); } - PrintFormat::Aligned => pretty::print_batches(batches)?, + PrintFormat::Table => pretty::print_batches(batches)?, } Ok(()) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 1971a3c0fd19..2ad7b39fc5e1 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -65,10 +65,12 @@ pub async fn main() { .takes_value(true), ) .arg( - Arg::with_name("csv") - .help("Switches to CSV (Comma-Separated Values) output mode.") - .long("csv") - .takes_value(false), + Arg::with_name("format") + .help("Output format") + .long("format") + .default_value("table") + .validator(is_valid_format) + .takes_value(true), ) .get_matches(); @@ -86,11 +88,11 @@ pub async fn main() { execution_config = execution_config.with_batch_size(batch_size); }; - let print_format = if matches.is_present("csv") { - PrintFormat::Csv - } else { - PrintFormat::Aligned - }; + let print_format = matches + .value_of("format") + .expect("No format is specified") + .parse::() + .expect("Invalid format"); if let Some(file_path) = matches.value_of("file") { let file = File::open(file_path) @@ -174,6 +176,14 @@ async fn exec_from_repl(execution_config: ExecutionConfig, print_format: PrintFo rl.save_history(".history").ok(); } +fn is_valid_format(format: String) -> std::result::Result<(), String> { + match format.to_lowercase().as_str() { + "csv" => Ok(()), + "table" => Ok(()), + _ => Err(format!("Format '{}' not supported", format)), + } +} + fn is_valid_file(dir: String) -> std::result::Result<(), String> { if Path::new(&dir).is_file() { Ok(()) From f11348679f0be1e54ada1d2987b05e1f645a2115 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:19:28 +0800 Subject: [PATCH 04/13] update help --- datafusion-cli/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 2ad7b39fc5e1..05c6766cee72 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -66,7 +66,7 @@ pub async fn main() { ) .arg( Arg::with_name("format") - .help("Output format") + .help("Output format (possible values: table, csv)") .long("format") .default_value("table") .validator(is_valid_format) From fbf83ca5bf9e5cdb0b5ff785075f0f603b5de71d Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:37:26 +0800 Subject: [PATCH 05/13] adding tsv format --- datafusion-cli/src/format/print_format.rs | 33 ++++++++++++++--------- datafusion-cli/src/main.rs | 8 +++--- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index 921e29f7fae7..0793a4ebf010 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -26,6 +26,7 @@ use std::str::FromStr; #[derive(Debug, Clone)] pub enum PrintFormat { Csv, + Tsv, Table, } @@ -34,29 +35,35 @@ impl FromStr for PrintFormat { fn from_str(s: &str) -> std::result::Result { match s { "csv" => Ok(PrintFormat::Csv), + "tsv" => Ok(PrintFormat::Tsv), "table" => Ok(PrintFormat::Table), _ => Err(()), } } } +fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { + let mut bytes = vec![]; + { + let builder = WriterBuilder::new() + .has_headers(true) + .with_delimiter(delimiter); + let mut writer = builder.build(&mut bytes); + for batch in batches { + writer.write(batch)?; + } + } + let formatted = String::from_utf8(bytes) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + Ok(formatted) +} + impl PrintFormat { /// print the batches to stdout using the specified format pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { match self { - PrintFormat::Csv => { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new().has_headers(true); - let mut writer = builder.build(&mut bytes); - for batch in batches { - writer.write(batch)?; - } - } - let csv = String::from_utf8(bytes) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - println!("{}", csv); - } + PrintFormat::Csv => println!("{}", print_batches_with_sep(batches, b',')?), + PrintFormat::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), PrintFormat::Table => pretty::print_batches(batches)?, } Ok(()) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 05c6766cee72..7229a688f17d 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -177,10 +177,10 @@ async fn exec_from_repl(execution_config: ExecutionConfig, print_format: PrintFo } fn is_valid_format(format: String) -> std::result::Result<(), String> { - match format.to_lowercase().as_str() { - "csv" => Ok(()), - "table" => Ok(()), - _ => Err(format!("Format '{}' not supported", format)), + if format.parse::().is_ok() { + Ok(()) + } else { + Err(format!("Format '{}' not supported", format)) } } From 5886d77f09f03b39f630812ddd3039b3f42676ac Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:43:31 +0800 Subject: [PATCH 06/13] use Self whereas possible --- datafusion-cli/src/format/print_format.rs | 14 +++++++------- datafusion-cli/src/main.rs | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index 0793a4ebf010..a9fc56b90524 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -32,11 +32,11 @@ pub enum PrintFormat { impl FromStr for PrintFormat { type Err = (); - fn from_str(s: &str) -> std::result::Result { + fn from_str(s: &str) -> std::result::Result { match s { - "csv" => Ok(PrintFormat::Csv), - "tsv" => Ok(PrintFormat::Tsv), - "table" => Ok(PrintFormat::Table), + "csv" => Ok(Self::Csv), + "tsv" => Ok(Self::Tsv), + "table" => Ok(Self::Table), _ => Err(()), } } @@ -62,9 +62,9 @@ impl PrintFormat { /// print the batches to stdout using the specified format pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { match self { - PrintFormat::Csv => println!("{}", print_batches_with_sep(batches, b',')?), - PrintFormat::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), - PrintFormat::Table => pretty::print_batches(batches)?, + Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), + Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), + Self::Table => pretty::print_batches(batches)?, } Ok(()) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 7229a688f17d..39d7b4d306a9 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -66,7 +66,7 @@ pub async fn main() { ) .arg( Arg::with_name("format") - .help("Output format (possible values: table, csv)") + .help("Output format (possible values: table, csv, tsv)") .long("format") .default_value("table") .validator(is_valid_format) From 78480cc6866175b783dcfada77675a97992b8f02 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:57:50 +0800 Subject: [PATCH 07/13] add json support --- datafusion-cli/src/format/print_format.rs | 16 ++++++++++++++++ datafusion-cli/src/main.rs | 2 +- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index a9fc56b90524..276e0d04b27a 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -17,6 +17,7 @@ //! Print format variants use arrow::csv::writer::WriterBuilder; +use arrow::json::ArrayWriter; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; use datafusion::error::{DataFusionError, Result}; @@ -28,6 +29,7 @@ pub enum PrintFormat { Csv, Tsv, Table, + Json, } impl FromStr for PrintFormat { @@ -37,11 +39,24 @@ impl FromStr for PrintFormat { "csv" => Ok(Self::Csv), "tsv" => Ok(Self::Tsv), "table" => Ok(Self::Table), + "json" => Ok(Self::Json), _ => Err(()), } } } +fn print_batches_to_json(batches: &[RecordBatch]) -> Result { + let mut bytes = vec![]; + { + let mut writer = ArrayWriter::new(&mut bytes); + writer.write_batches(batches)?; + writer.finish()?; + } + let formatted = String::from_utf8(bytes) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + Ok(formatted) +} + fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { let mut bytes = vec![]; { @@ -65,6 +80,7 @@ impl PrintFormat { Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), Self::Table => pretty::print_batches(batches)?, + Self::Json => println!("{}", print_batches_to_json(batches)?), } Ok(()) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 39d7b4d306a9..22f66c5c2aeb 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -66,7 +66,7 @@ pub async fn main() { ) .arg( Arg::with_name("format") - .help("Output format (possible values: table, csv, tsv)") + .help("Output format (possible values: table, csv, tsv, json)") .long("format") .default_value("table") .validator(is_valid_format) From 266b57cbca37008616bcd32099c8907f37134d30 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 21:18:35 +0800 Subject: [PATCH 08/13] adding unit test --- datafusion-cli/src/format/print_format.rs | 83 ++++++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index 276e0d04b27a..92cf798ba4bb 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -24,7 +24,7 @@ use datafusion::error::{DataFusionError, Result}; use std::str::FromStr; /// Allow records to be printed in different formats -#[derive(Debug, Clone)] +#[derive(Debug, PartialEq, Eq, Clone)] pub enum PrintFormat { Csv, Tsv, @@ -85,3 +85,84 @@ impl PrintFormat { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::Int32Array; + use arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + #[test] + fn test_from_str() { + let format = "csv".parse::().unwrap(); + assert_eq!(PrintFormat::Csv, format); + + let format = "tsv".parse::().unwrap(); + assert_eq!(PrintFormat::Tsv, format); + + let format = "json".parse::().unwrap(); + assert_eq!(PrintFormat::Json, format); + + let format = "table".parse::().unwrap(); + assert_eq!(PrintFormat::Table, format); + } + + #[test] + fn test_from_str_failure() { + assert_eq!(true, "pretty".parse::().is_err()); + } + + #[test] + fn test_print_batches_with_sep() { + let batches = vec![]; + assert_eq!("", print_batches_with_sep(&batches, b',').unwrap()); + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ], + ) + .unwrap(); + + let batches = vec![batch]; + let r = print_batches_with_sep(&batches, b',').unwrap(); + assert_eq!("a,b,c\n1,4,7\n2,5,8\n3,6,9\n", r); + } + + #[test] + fn test_print_batches_to_json_empty() { + let batches = vec![]; + let r = print_batches_to_json(&batches).unwrap(); + assert_eq!("", r); + + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Int32, false), + Field::new("c", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(Int32Array::from(vec![4, 5, 6])), + Arc::new(Int32Array::from(vec![7, 8, 9])), + ], + ) + .unwrap(); + + let batches = vec![batch]; + let r = print_batches_to_json(&batches).unwrap(); + assert_eq!("[{\"a\":1,\"b\":4,\"c\":7},{\"a\":2,\"b\":5,\"c\":8},{\"a\":3,\"b\":6,\"c\":9}]", r); + } +} From 723a2391cb0ba75c93c8057addd03ffa72068af2 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 22:21:54 +0800 Subject: [PATCH 09/13] remove redundant clone --- datafusion-cli/src/format/print_format.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index 92cf798ba4bb..85caaa3c5276 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -125,7 +125,7 @@ mod tests { ])); let batch = RecordBatch::try_new( - schema.clone(), + schema, vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![4, 5, 6])), @@ -152,7 +152,7 @@ mod tests { ])); let batch = RecordBatch::try_new( - schema.clone(), + schema, vec![ Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(Int32Array::from(vec![4, 5, 6])), From addbc93b6d48533cab154912586c57fa793083a7 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 14:28:10 +0800 Subject: [PATCH 10/13] add csv mode to datafusion cli --- datafusion-cli/src/format/print_format.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index 921e29f7fae7..ba48e0572c71 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -19,7 +19,9 @@ use arrow::csv::writer::WriterBuilder; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; +use datafusion::error::Result; use datafusion::error::{DataFusionError, Result}; +use std::io::{Read, Seek, SeekFrom}; use std::str::FromStr; /// Allow records to be printed in different formats From d40d812dba2268ad0c6b5076c9939156924e8358 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:37:26 +0800 Subject: [PATCH 11/13] adding tsv format --- datafusion-cli/src/format/print_format.rs | 33 ++++++++++++++--------- datafusion-cli/src/main.rs | 8 +++--- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index ba48e0572c71..b26e94e6dcb7 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -28,6 +28,7 @@ use std::str::FromStr; #[derive(Debug, Clone)] pub enum PrintFormat { Csv, + Tsv, Table, } @@ -36,29 +37,35 @@ impl FromStr for PrintFormat { fn from_str(s: &str) -> std::result::Result { match s { "csv" => Ok(PrintFormat::Csv), + "tsv" => Ok(PrintFormat::Tsv), "table" => Ok(PrintFormat::Table), _ => Err(()), } } } +fn print_batches_with_sep(batches: &[RecordBatch], delimiter: u8) -> Result { + let mut bytes = vec![]; + { + let builder = WriterBuilder::new() + .has_headers(true) + .with_delimiter(delimiter); + let mut writer = builder.build(&mut bytes); + for batch in batches { + writer.write(batch)?; + } + } + let formatted = String::from_utf8(bytes) + .map_err(|e| DataFusionError::Execution(e.to_string()))?; + Ok(formatted) +} + impl PrintFormat { /// print the batches to stdout using the specified format pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { match self { - PrintFormat::Csv => { - let mut bytes = vec![]; - { - let builder = WriterBuilder::new().has_headers(true); - let mut writer = builder.build(&mut bytes); - for batch in batches { - writer.write(batch)?; - } - } - let csv = String::from_utf8(bytes) - .map_err(|e| DataFusionError::Execution(e.to_string()))?; - println!("{}", csv); - } + PrintFormat::Csv => println!("{}", print_batches_with_sep(batches, b',')?), + PrintFormat::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), PrintFormat::Table => pretty::print_batches(batches)?, } Ok(()) diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 05c6766cee72..7229a688f17d 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -177,10 +177,10 @@ async fn exec_from_repl(execution_config: ExecutionConfig, print_format: PrintFo } fn is_valid_format(format: String) -> std::result::Result<(), String> { - match format.to_lowercase().as_str() { - "csv" => Ok(()), - "table" => Ok(()), - _ => Err(format!("Format '{}' not supported", format)), + if format.parse::().is_ok() { + Ok(()) + } else { + Err(format!("Format '{}' not supported", format)) } } From bca26306b729311d3d20392962852f73aaa46e15 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sat, 8 May 2021 20:43:31 +0800 Subject: [PATCH 12/13] use Self whereas possible --- datafusion-cli/src/format/print_format.rs | 14 +++++++------- datafusion-cli/src/main.rs | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index b26e94e6dcb7..c5756e655744 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -34,11 +34,11 @@ pub enum PrintFormat { impl FromStr for PrintFormat { type Err = (); - fn from_str(s: &str) -> std::result::Result { + fn from_str(s: &str) -> std::result::Result { match s { - "csv" => Ok(PrintFormat::Csv), - "tsv" => Ok(PrintFormat::Tsv), - "table" => Ok(PrintFormat::Table), + "csv" => Ok(Self::Csv), + "tsv" => Ok(Self::Tsv), + "table" => Ok(Self::Table), _ => Err(()), } } @@ -64,9 +64,9 @@ impl PrintFormat { /// print the batches to stdout using the specified format pub fn print_batches(&self, batches: &[RecordBatch]) -> Result<()> { match self { - PrintFormat::Csv => println!("{}", print_batches_with_sep(batches, b',')?), - PrintFormat::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), - PrintFormat::Table => pretty::print_batches(batches)?, + Self::Csv => println!("{}", print_batches_with_sep(batches, b',')?), + Self::Tsv => println!("{}", print_batches_with_sep(batches, b'\t')?), + Self::Table => pretty::print_batches(batches)?, } Ok(()) } diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs index 7229a688f17d..39d7b4d306a9 100644 --- a/datafusion-cli/src/main.rs +++ b/datafusion-cli/src/main.rs @@ -66,7 +66,7 @@ pub async fn main() { ) .arg( Arg::with_name("format") - .help("Output format (possible values: table, csv)") + .help("Output format (possible values: table, csv, tsv)") .long("format") .default_value("table") .validator(is_valid_format) From 39f460efc38d51628e3dbd94880e2bf2e3470902 Mon Sep 17 00:00:00 2001 From: Jiayu Liu Date: Sun, 9 May 2021 23:02:07 +0800 Subject: [PATCH 13/13] prune import --- datafusion-cli/src/format/print_format.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion-cli/src/format/print_format.rs b/datafusion-cli/src/format/print_format.rs index c5756e655744..a9fc56b90524 100644 --- a/datafusion-cli/src/format/print_format.rs +++ b/datafusion-cli/src/format/print_format.rs @@ -19,9 +19,7 @@ use arrow::csv::writer::WriterBuilder; use datafusion::arrow::record_batch::RecordBatch; use datafusion::arrow::util::pretty; -use datafusion::error::Result; use datafusion::error::{DataFusionError, Result}; -use std::io::{Read, Seek, SeekFrom}; use std::str::FromStr; /// Allow records to be printed in different formats