diff --git a/datafusion-examples/examples/flight_client.rs b/datafusion-examples/examples/flight_client.rs index 469f3ebef0c8..e03b2c14c00a 100644 --- a/datafusion-examples/examples/flight_client.rs +++ b/datafusion-examples/examples/flight_client.rs @@ -20,7 +20,7 @@ use std::sync::Arc; use arrow::io::flight::deserialize_schemas; use arrow_format::flight::data::{flight_descriptor, FlightDescriptor, Ticket}; use arrow_format::flight::service::flight_service_client::FlightServiceClient; -use datafusion::arrow::io::print; +use datafusion::arrow_print; use std::collections::HashMap; /// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box> { } // print the results - print::print(&results); + println!("{}", arrow_print::write(&result)); Ok(()) } diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml index 69e82b1fee86..5a79041bbb85 100644 --- a/datafusion/Cargo.toml +++ b/datafusion/Cargo.toml @@ -79,11 +79,13 @@ rand = "0.8" num-traits = { version = "0.2", optional = true } pyo3 = { version = "0.14", optional = true } avro-schema = { version = "0.2", optional = true } +# used to print arrow arrays in a nice columnar format +comfy-table = { version = "5.0", default-features = false } [dependencies.arrow] package = "arrow2" version="0.8" -features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash", "compute"] +features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"] [dev-dependencies] criterion = "0.3" diff --git a/datafusion/src/arrow_print.rs b/datafusion/src/arrow_print.rs new file mode 100644 index 000000000000..9232870c5e94 --- /dev/null +++ b/datafusion/src/arrow_print.rs @@ -0,0 +1,151 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Fork of arrow::io::print to implement custom Binary Array formatting logic. + +// adapted from https://github.com/jorgecarleitao/arrow2/blob/ef7937dfe56033c2cc491482c67587b52cd91554/src/array/display.rs +// see: https://github.com/jorgecarleitao/arrow2/issues/771 + +use arrow::{array::*, record_batch::RecordBatch}; + +use comfy_table::{Cell, Table}; + +macro_rules! dyn_display { + ($array:expr, $ty:ty, $expr:expr) => {{ + let a = $array.as_any().downcast_ref::<$ty>().unwrap(); + Box::new(move |row: usize| format!("{}", $expr(a.value(row)))) + }}; +} + +fn df_get_array_value_display<'a>( + array: &'a dyn Array, +) -> Box String + 'a> { + use arrow::datatypes::DataType::*; + match array.data_type() { + Binary => dyn_display!(array, BinaryArray, |x: &[u8]| { + x.iter().fold("".to_string(), |mut acc, x| { + acc.push_str(&format!("{:02x}", x)); + acc + }) + }), + LargeBinary => dyn_display!(array, BinaryArray, |x: &[u8]| { + x.iter().fold("".to_string(), |mut acc, x| { + acc.push_str(&format!("{:02x}", x)); + acc + }) + }), + List(_) => { + let f = |x: Box| { + let display = df_get_array_value_display(x.as_ref()); + let string_values = (0..x.len()).map(display).collect::>(); + format!("[{}]", string_values.join(", ")) + }; + dyn_display!(array, ListArray, f) + } + FixedSizeList(_, _) => { + let f = |x: Box| { + let display = df_get_array_value_display(x.as_ref()); + let string_values = (0..x.len()).map(display).collect::>(); + format!("[{}]", string_values.join(", ")) + }; + dyn_display!(array, FixedSizeListArray, f) + } + LargeList(_) => { + let f = |x: Box| { + let display = df_get_array_value_display(x.as_ref()); + let string_values = (0..x.len()).map(display).collect::>(); + format!("[{}]", string_values.join(", ")) + }; + dyn_display!(array, ListArray, f) + } + Struct(_) => { + let a = array.as_any().downcast_ref::().unwrap(); + let displays = a + .values() + .iter() + .map(|x| df_get_array_value_display(x.as_ref())) + .collect::>(); + Box::new(move |row: usize| { + let mut string = displays + .iter() + .zip(a.fields().iter().map(|f| f.name())) + .map(|(f, name)| (f(row), name)) + .fold("{".to_string(), |mut acc, (v, name)| { + acc.push_str(&format!("{}: {}, ", name, v)); + acc + }); + if string.len() > 1 { + // remove last ", " + string.pop(); + string.pop(); + } + string.push('}'); + string + }) + } + _ => get_display(array), + } +} + +/// Returns a function of index returning the string representation of the item of `array`. +/// This outputs an empty string on nulls. +pub fn df_get_display<'a>(array: &'a dyn Array) -> Box String + 'a> { + let value_display = df_get_array_value_display(array); + Box::new(move |row| { + if array.is_null(row) { + "".to_string() + } else { + value_display(row) + } + }) +} + +/// Convert a series of record batches into a String +pub fn write(results: &[RecordBatch]) -> String { + let mut table = Table::new(); + table.load_preset("||--+-++| ++++++"); + + if results.is_empty() { + return table.to_string(); + } + + let schema = results[0].schema(); + + let mut header = Vec::new(); + for field in schema.fields() { + header.push(Cell::new(field.name())); + } + table.set_header(header); + + for batch in results { + let displayes = batch + .columns() + .iter() + .map(|array| df_get_display(array.as_ref())) + .collect::>(); + + for row in 0..batch.num_rows() { + let mut cells = Vec::new(); + (0..batch.num_columns()).for_each(|col| { + let string = displayes[col](row); + cells.push(Cell::new(&string)); + }); + table.add_row(cells); + } + } + table.to_string() +} diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index aa440ca54455..4cf427d1be2b 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -29,7 +29,6 @@ use crate::{ dataframe::*, physical_plan::{collect, collect_partitioned}, }; -use arrow::io::print; use arrow::record_batch::RecordBatch; use crate::physical_plan::{ @@ -168,14 +167,14 @@ impl DataFrame for DataFrameImpl { /// Print results. async fn show(&self) -> Result<()> { let results = self.collect().await?; - print::print(&results); + print!("{}", crate::arrow_print::write(&results)); Ok(()) } /// Print results and limit rows. async fn show_limit(&self, num: usize) -> Result<()> { let results = self.limit(num)?.collect().await?; - print::print(&results); + print!("{}", crate::arrow_print::write(&results)); Ok(()) } diff --git a/datafusion/src/lib.rs b/datafusion/src/lib.rs index 544d566273bd..dd735b7621db 100644 --- a/datafusion/src/lib.rs +++ b/datafusion/src/lib.rs @@ -57,7 +57,7 @@ //! let results: Vec = df.collect().await?; //! //! // format the results -//! let pretty_results = datafusion::arrow::io::print::write(&results); +//! let pretty_results = datafusion::arrow_print::write(&results); //! //! let expected = vec![ //! "+---+--------------------------+", @@ -92,7 +92,7 @@ //! let results: Vec = df.collect().await?; //! //! // format the results -//! let pretty_results = datafusion::arrow::io::print::write(&results); +//! let pretty_results = datafusion::arrow_print::write(&results); //! //! let expected = vec![ //! "+---+----------------+", @@ -229,6 +229,7 @@ pub mod variable; pub use arrow; pub use parquet; +pub mod arrow_print; mod arrow_temporal_util; pub mod field_util; diff --git a/datafusion/src/physical_plan/file_format/csv.rs b/datafusion/src/physical_plan/file_format/csv.rs index e4b93e88c3de..00b303575b5d 100644 --- a/datafusion/src/physical_plan/file_format/csv.rs +++ b/datafusion/src/physical_plan/file_format/csv.rs @@ -250,6 +250,7 @@ impl ExecutionPlan for CsvExec { mod tests { use super::*; use crate::{ + assert_batches_eq, datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem}, scalar::ScalarValue, test_util::aggr_test_schema, @@ -298,7 +299,7 @@ mod tests { "+----+-----+------------+", ]; - crate::assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]); + assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]); Ok(()) } @@ -343,7 +344,7 @@ mod tests { "+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+", ]; - crate::assert_batches_eq!(expected, &[batch]); + assert_batches_eq!(expected, &[batch]); Ok(()) } @@ -396,7 +397,7 @@ mod tests { "| b | 2021-10-26 |", "+----+------------+", ]; - crate::assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]); + assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]); Ok(()) } diff --git a/datafusion/src/physical_plan/file_format/file_stream.rs b/datafusion/src/physical_plan/file_format/file_stream.rs index 6c6c7e6c31d1..c90df7e0b009 100644 --- a/datafusion/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/src/physical_plan/file_format/file_stream.rs @@ -192,6 +192,7 @@ mod tests { use super::*; use crate::{ + assert_batches_eq, error::Result, test::{make_partition, object_store::TestObjectStore}, }; @@ -230,7 +231,7 @@ mod tests { let batches = create_and_collect(None).await; #[rustfmt::skip] - crate::assert_batches_eq!(&[ + assert_batches_eq!(&[ "+---+", "| i |", "+---+", @@ -254,7 +255,7 @@ mod tests { async fn with_limit_between_files() -> Result<()> { let batches = create_and_collect(Some(5)).await; #[rustfmt::skip] - crate::assert_batches_eq!(&[ + assert_batches_eq!(&[ "+---+", "| i |", "+---+", @@ -273,7 +274,7 @@ mod tests { async fn with_limit_at_middle_of_batch() -> Result<()> { let batches = create_and_collect(Some(6)).await; #[rustfmt::skip] - crate::assert_batches_eq!(&[ + assert_batches_eq!(&[ "+---+", "| i |", "+---+", diff --git a/datafusion/src/physical_plan/file_format/mod.rs b/datafusion/src/physical_plan/file_format/mod.rs index 0d372810985d..036b605154af 100644 --- a/datafusion/src/physical_plan/file_format/mod.rs +++ b/datafusion/src/physical_plan/file_format/mod.rs @@ -269,6 +269,7 @@ fn create_dict_array( #[cfg(test)] mod tests { use crate::{ + assert_batches_eq, test::{build_table_i32, columns, object_store::TestObjectStore}, test_util::aggr_test_schema, }; @@ -399,7 +400,7 @@ mod tests { "| 2 | 0 | 12 | 2021 | 26 |", "+---+----+----+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); // project another batch that is larger than the previous one let file_batch = build_table_i32( @@ -429,7 +430,7 @@ mod tests { "| 9 | -6 | 16 | 2021 | 27 |", "+---+-----+----+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); // project another batch that is smaller than the previous one let file_batch = build_table_i32( @@ -457,7 +458,7 @@ mod tests { "| 3 | 4 | 6 | 2021 | 28 |", "+---+---+---+------+-----+", ]; - crate::assert_batches_eq!(expected, &[projected_batch]); + assert_batches_eq!(expected, &[projected_batch]); } // sets default for configs that play no role in projections diff --git a/datafusion/src/physical_plan/file_format/parquet.rs b/datafusion/src/physical_plan/file_format/parquet.rs index 55365e4b84d2..633343c5f76f 100644 --- a/datafusion/src/physical_plan/file_format/parquet.rs +++ b/datafusion/src/physical_plan/file_format/parquet.rs @@ -458,6 +458,7 @@ fn read_partition( #[cfg(test)] mod tests { + use crate::assert_batches_eq; use crate::datasource::{ file_format::{parquet::ParquetFormat, FileFormat}, object_store::local::{ @@ -566,7 +567,7 @@ mod tests { "| 1 | false | 1 | 10 |", "+----+----------+-------------+-------+", ]; - crate::assert_batches_eq!(expected, &[batch]); + assert_batches_eq!(expected, &[batch]); let batch = results.next().await; assert!(batch.is_none()); diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs index 90608db172d5..900a29c32de8 100644 --- a/datafusion/src/physical_plan/hash_aggregate.rs +++ b/datafusion/src/physical_plan/hash_aggregate.rs @@ -1023,10 +1023,11 @@ mod tests { use futures::FutureExt; use super::*; + use crate::assert_batches_sorted_eq; + use crate::physical_plan::common; use crate::physical_plan::expressions::{col, Avg}; use crate::test::assert_is_pending; use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec}; - use crate::{assert_batches_sorted_eq, physical_plan::common}; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; diff --git a/datafusion/src/physical_plan/sort_preserving_merge.rs b/datafusion/src/physical_plan/sort_preserving_merge.rs index ec3ad9f9a34c..bc9aada8cee9 100644 --- a/datafusion/src/physical_plan/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sort_preserving_merge.rs @@ -669,7 +669,8 @@ mod tests { use crate::arrow::array::*; use crate::arrow::datatypes::*; - use crate::arrow::io::print; + use crate::arrow_print; + use crate::assert_batches_eq; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; use crate::physical_plan::expressions::col; use crate::physical_plan::file_format::{CsvExec, PhysicalPlanConfig}; @@ -677,7 +678,7 @@ mod tests { use crate::physical_plan::sort::SortExec; use crate::physical_plan::{collect, common}; use crate::test::{self, assert_is_pending}; - use crate::{assert_batches_eq, test_util}; + use crate::test_util; use super::*; use arrow::datatypes::{DataType, Field, Schema}; @@ -1008,8 +1009,8 @@ mod tests { let basic = basic_sort(csv.clone(), sort.clone()).await; let partition = partition_sort(csv, sort).await; - let basic = print::write(&[basic]); - let partition = print::write(&[partition]); + let basic = arrow_print::write(&[basic]); + let partition = arrow_print::write(&[partition]); assert_eq!( basic, partition, @@ -1106,8 +1107,8 @@ mod tests { assert_eq!(basic.num_rows(), 300); assert_eq!(partition.num_rows(), 300); - let basic = print::write(&[basic]); - let partition = print::write(&[partition]); + let basic = arrow_print::write(&[basic]); + let partition = arrow_print::write(&[partition]); assert_eq!(basic, partition); } @@ -1140,8 +1141,8 @@ mod tests { assert_eq!(basic.num_rows(), 300); assert_eq!(merged.iter().map(|x| x.num_rows()).sum::(), 300); - let basic = print::write(&[basic]); - let partition = print::write(merged.as_slice()); + let basic = arrow_print::write(&[basic]); + let partition = arrow_print::write(merged.as_slice()); assert_eq!(basic, partition); } @@ -1272,8 +1273,8 @@ mod tests { let merged = merged.remove(0); let basic = basic_sort(batches, sort.clone()).await; - let basic = print::write(&[basic]); - let partition = print::write(&[merged]); + let basic = arrow_print::write(&[basic]); + let partition = arrow_print::write(&[merged]); assert_eq!( basic, partition, diff --git a/datafusion/src/test_util.rs b/datafusion/src/test_util.rs index 5d5494fa58eb..06850f6bdc20 100644 --- a/datafusion/src/test_util.rs +++ b/datafusion/src/test_util.rs @@ -38,7 +38,7 @@ macro_rules! assert_batches_eq { let expected_lines: Vec = $EXPECTED_LINES.iter().map(|&s| s.into()).collect(); - let formatted = arrow::io::print::write($CHUNKS); + let formatted = $crate::arrow_print::write($CHUNKS); let actual_lines: Vec<&str> = formatted.trim().lines().collect(); @@ -72,7 +72,7 @@ macro_rules! assert_batches_sorted_eq { expected_lines.as_mut_slice()[2..num_lines - 1].sort_unstable() } - let formatted = arrow::io::print::write($CHUNKS); + let formatted = $crate::arrow_print::write($CHUNKS); // fix for windows: \r\n --> let mut actual_lines: Vec<&str> = formatted.trim().lines().collect(); diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index ed21fae8ad2f..3c27b82a3b0b 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -32,8 +32,8 @@ use arrow::{ record_batch::RecordBatch, }; use chrono::{Datelike, Duration}; -use datafusion::arrow::io::print; use datafusion::{ + arrow_print, datasource::TableProvider, logical_plan::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder}, physical_plan::{ @@ -530,7 +530,7 @@ impl ContextWithParquet { .collect() .await .expect("getting input"); - let pretty_input = print::write(&input); + let pretty_input = arrow_print::write(&input); let logical_plan = self.ctx.optimize(&logical_plan).expect("optimizing plan"); let physical_plan = self @@ -566,7 +566,7 @@ impl ContextWithParquet { let result_rows = results.iter().map(|b| b.num_rows()).sum(); - let pretty_results = print::write(&results); + let pretty_results = arrow_print::write(&results); let sql = sql.into(); TestOutput { diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs index f2ae4eba0130..3a08ee031f12 100644 --- a/datafusion/tests/sql/mod.rs +++ b/datafusion/tests/sql/mod.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use chrono::prelude::*; use chrono::Duration; -use datafusion::arrow::io::print; use datafusion::arrow::{array::*, datatypes::*, record_batch::RecordBatch}; use datafusion::assert_batches_eq; use datafusion::assert_batches_sorted_eq; diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index fe83f69a79a6..72ab6f9499c9 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -64,10 +64,10 @@ use arrow::{ array::{Int64Array, Utf8Array}, datatypes::SchemaRef, error::ArrowError, - io::print::write, record_batch::RecordBatch, }; use datafusion::{ + arrow_print::write, error::{DataFusionError, Result}, execution::context::ExecutionContextState, execution::context::QueryPlanner,