From 7c7014b68baee58dc93a78c930778a07b4f9cc2f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 30 Mar 2020 21:23:49 -0600 Subject: [PATCH] ARROW-8264: [Rust] [DataFusion] Add utility for printing batches This PR adds a utility for printing batches and updates the examples to call this code. This makes the examples more concise and the output more consistent. The code was previously in the CLI but is now in the new utils module. ``` $ cargo run --example csv_sql Compiling datafusion v1.0.0-SNAPSHOT (/home/andy/git/andygrove/arrow/rust/datafusion) Finished dev [unoptimized + debuginfo] target(s) in 4.16s Running `target/debug/examples/csv_sql` +----+----------------------+--------------------+ | c1 | MIN | MAX | +----+----------------------+--------------------+ | a | 0.02182578039211991 | 0.9567595541247681 | | e | 0.01479305307777301 | 0.9965400387585364 | | d | 0.061029375346466685 | 0.9748360509016578 | | c | 0.2667177795079635 | 0.991517828651004 | | b | 0.04893135681998029 | 0.9185813970744787 | +----+----------------------+--------------------+ ``` Closes #6754 from andygrove/datafusion-utils Authored-by: Andy Grove Signed-off-by: Andy Grove --- rust/datafusion/examples/csv_sql.rs | 48 +---- .../{flight-client.rs => flight_client.rs} | 21 +-- .../{flight-server.rs => flight_server.rs} | 0 rust/datafusion/examples/memory_table_api.rs | 30 +-- rust/datafusion/examples/parquet_sql.rs | 43 +---- rust/datafusion/src/bin/repl.rs | 93 +--------- rust/datafusion/src/lib.rs | 1 + rust/datafusion/src/utils.rs | 173 ++++++++++++++++++ 8 files changed, 197 insertions(+), 212 deletions(-) rename rust/datafusion/examples/{flight-client.rs => flight_client.rs} (86%) rename rust/datafusion/examples/{flight-server.rs => flight_server.rs} (100%) create mode 100644 rust/datafusion/src/utils.rs diff --git a/rust/datafusion/examples/csv_sql.rs b/rust/datafusion/examples/csv_sql.rs index ad4a2a119e18b..310c90a2f1307 100644 --- a/rust/datafusion/examples/csv_sql.rs +++ b/rust/datafusion/examples/csv_sql.rs @@ -15,16 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::sync::Arc; - -extern crate arrow; -extern crate datafusion; - -use arrow::array::{Float64Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; +use datafusion::utils; /// This example demonstrates executing a simple query against an Arrow data source (CSV) and /// fetching results @@ -33,7 +28,7 @@ fn main() -> Result<()> { let mut ctx = ExecutionContext::new(); // define schema for data source (csv file) - let schema = Arc::new(Schema::new(vec![ + let schema = Schema::new(vec![ Field::new("c1", DataType::Utf8, false), Field::new("c2", DataType::UInt32, false), Field::new("c3", DataType::Int8, false), @@ -47,7 +42,7 @@ fn main() -> Result<()> { Field::new("c11", DataType::Float32, false), Field::new("c12", DataType::Float64, false), Field::new("c13", DataType::Utf8, false), - ])); + ]); let testdata = std::env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined"); @@ -69,41 +64,8 @@ fn main() -> Result<()> { // execute the query let results = ctx.collect(plan.as_ref())?; - // iterate over the results - results.iter().for_each(|batch| { - println!( - "RecordBatch has {} rows and {} columns", - batch.num_rows(), - batch.num_columns() - ); - - let c1 = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let min = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - let max = batch - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - - for i in 0..batch.num_rows() { - println!( - "{}, Min: {}, Max: {}", - c1.value(i), - min.value(i), - max.value(i), - ); - } - }); + // print the results + utils::print_batches(&results)?; Ok(()) } diff --git a/rust/datafusion/examples/flight-client.rs b/rust/datafusion/examples/flight_client.rs similarity index 86% rename from rust/datafusion/examples/flight-client.rs rename to rust/datafusion/examples/flight_client.rs index 128d6f56c1ac7..f96eb86a08802 100644 --- a/rust/datafusion/examples/flight-client.rs +++ b/rust/datafusion/examples/flight_client.rs @@ -18,9 +18,11 @@ use std::convert::TryFrom; use std::sync::Arc; -use arrow::array::Int32Array; use arrow::datatypes::Schema; use arrow::flight::flight_data_to_batch; + +use datafusion::utils; + use flight::flight_descriptor; use flight::flight_service_client::FlightServiceClient; use flight::{FlightDescriptor, Ticket}; @@ -58,22 +60,15 @@ async fn main() -> Result<(), Box> { println!("Schema: {:?}", schema); // all the remaining stream messages should be dictionary and record batches + let mut results = vec![]; while let Some(flight_data) = stream.message().await? { // the unwrap is infallible and thus safe let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap(); - - println!( - "record_batch has {} columns and {} rows", - record_batch.num_columns(), - record_batch.num_rows() - ); - let column = record_batch.column(0); - let column = column - .as_any() - .downcast_ref::() - .expect("Unable to get column"); - println!("Column 1: {:?}", column); + results.push(record_batch); } + // print the results + utils::print_batches(&results).unwrap(); + Ok(()) } diff --git a/rust/datafusion/examples/flight-server.rs b/rust/datafusion/examples/flight_server.rs similarity index 100% rename from rust/datafusion/examples/flight-server.rs rename to rust/datafusion/examples/flight_server.rs diff --git a/rust/datafusion/examples/memory_table_api.rs b/rust/datafusion/examples/memory_table_api.rs index cf42264fd4504..27b308c4d8688 100644 --- a/rust/datafusion/examples/memory_table_api.rs +++ b/rust/datafusion/examples/memory_table_api.rs @@ -18,9 +18,6 @@ use std::boxed::Box; use std::sync::Arc; -extern crate arrow; -extern crate datafusion; - use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; @@ -29,6 +26,7 @@ use datafusion::datasource::MemTable; use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; use datafusion::logicalplan::{Expr, ScalarValue}; +use datafusion::utils; /// This example demonstrates basic uses of the Table API on an in-memory table fn main() -> Result<()> { @@ -63,30 +61,8 @@ fn main() -> Result<()> { // execute let results = t.collect(&mut ctx, 10)?; - // print results - results.iter().for_each(|batch| { - println!( - "RecordBatch has {} rows and {} columns", - batch.num_rows(), - batch.num_columns() - ); - - let c1 = batch - .column(0) - .as_any() - .downcast_ref::() - .expect("String type"); - - let c2 = batch - .column(1) - .as_any() - .downcast_ref::() - .expect("Int type"); - - for i in 0..batch.num_rows() { - println!("{}, {}", c1.value(i), c2.value(i),); - } - }); + // print the results + utils::print_batches(&results)?; Ok(()) } diff --git a/rust/datafusion/examples/parquet_sql.rs b/rust/datafusion/examples/parquet_sql.rs index 8a2f16b434110..696854da09cec 100644 --- a/rust/datafusion/examples/parquet_sql.rs +++ b/rust/datafusion/examples/parquet_sql.rs @@ -15,13 +15,9 @@ // specific language governing permissions and limitations // under the License. -extern crate arrow; -extern crate datafusion; - -use arrow::array::{Float64Array, Int32Array, StringArray}; - use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; +use datafusion::utils; /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and /// fetching results @@ -49,41 +45,8 @@ fn main() -> Result<()> { // execute the query let results = ctx.collect(plan.as_ref())?; - // iterate over the results - results.iter().for_each(|batch| { - println!( - "RecordBatch has {} rows and {} columns", - batch.num_rows(), - batch.num_columns() - ); - - let int = batch - .column(0) - .as_any() - .downcast_ref::() - .unwrap(); - - let double = batch - .column(1) - .as_any() - .downcast_ref::() - .unwrap(); - - let date = batch - .column(2) - .as_any() - .downcast_ref::() - .unwrap(); - - for i in 0..batch.num_rows() { - println!( - "Date: {}, Int: {}, Double: {}", - date.value(i), - int.value(i), - double.value(i) - ); - } - }); + // print the results + utils::print_batches(&results)?; Ok(()) } diff --git a/rust/datafusion/src/bin/repl.rs b/rust/datafusion/src/bin/repl.rs index ea9cd2c84d1c8..a16068a35c1c4 100644 --- a/rust/datafusion/src/bin/repl.rs +++ b/rust/datafusion/src/bin/repl.rs @@ -17,12 +17,10 @@ #![allow(bare_trait_objects)] -use arrow::array::*; -use arrow::datatypes::{DataType, TimeUnit}; use clap::{crate_version, App, Arg}; -use datafusion::error::{ExecutionError, Result}; +use datafusion::error::Result; use datafusion::execution::context::ExecutionContext; -use prettytable::{Cell, Row, Table}; +use datafusion::utils; use rustyline::Editor; use std::env; use std::path::Path; @@ -118,29 +116,9 @@ fn exec_and_print( return Ok(()); } - let mut row_count = 0; - let mut table = Table::new(); - let schema = results[0].schema(); + utils::print_batches(&results)?; - let mut header = Vec::new(); - for field in schema.fields() { - header.push(Cell::new(&field.name())); - } - table.add_row(Row::new(header)); - - for batch in results { - row_count += batch.num_rows(); - - for row in 0..batch.num_rows() { - let mut cells = Vec::new(); - for col in 0..batch.num_columns() { - let column = batch.column(col); - cells.push(Cell::new(&str_value(column.clone(), row)?)); - } - table.add_row(Row::new(cells)); - } - } - table.printstd(); + let row_count: usize = results.iter().map(|b| b.num_rows()).sum(); if row_count > 1 { println!( @@ -158,66 +136,3 @@ fn exec_and_print( Ok(()) } - -macro_rules! make_string { - ($array_type:ty, $column: ident, $row: ident) => {{ - Ok($column - .as_any() - .downcast_ref::<$array_type>() - .unwrap() - .value($row) - .to_string()) - }}; -} - -fn str_value(column: ArrayRef, row: usize) -> Result { - match column.data_type() { - DataType::Utf8 => Ok(column - .as_any() - .downcast_ref::() - .unwrap() - .value(row) - .to_string()), - DataType::Boolean => make_string!(BooleanArray, column, row), - DataType::Int16 => make_string!(Int16Array, column, row), - DataType::Int32 => make_string!(Int32Array, column, row), - DataType::Int64 => make_string!(Int64Array, column, row), - DataType::UInt8 => make_string!(UInt8Array, column, row), - DataType::UInt16 => make_string!(UInt16Array, column, row), - DataType::UInt32 => make_string!(UInt32Array, column, row), - DataType::UInt64 => make_string!(UInt64Array, column, row), - DataType::Float16 => make_string!(Float32Array, column, row), - DataType::Float32 => make_string!(Float32Array, column, row), - DataType::Float64 => make_string!(Float64Array, column, row), - DataType::Timestamp(unit, _) if *unit == TimeUnit::Second => { - make_string!(TimestampSecondArray, column, row) - } - DataType::Timestamp(unit, _) if *unit == TimeUnit::Millisecond => { - make_string!(TimestampMillisecondArray, column, row) - } - DataType::Timestamp(unit, _) if *unit == TimeUnit::Microsecond => { - make_string!(TimestampMicrosecondArray, column, row) - } - DataType::Timestamp(unit, _) if *unit == TimeUnit::Nanosecond => { - make_string!(TimestampNanosecondArray, column, row) - } - DataType::Date32(_) => make_string!(Date32Array, column, row), - DataType::Date64(_) => make_string!(Date64Array, column, row), - DataType::Time32(unit) if *unit == TimeUnit::Second => { - make_string!(Time32SecondArray, column, row) - } - DataType::Time32(unit) if *unit == TimeUnit::Millisecond => { - make_string!(Time32MillisecondArray, column, row) - } - DataType::Time32(unit) if *unit == TimeUnit::Microsecond => { - make_string!(Time64MicrosecondArray, column, row) - } - DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => { - make_string!(Time64NanosecondArray, column, row) - } - _ => Err(ExecutionError::ExecutionError(format!( - "Unsupported {:?} type for repl.", - column.data_type() - ))), - } -} diff --git a/rust/datafusion/src/lib.rs b/rust/datafusion/src/lib.rs index fb4e5af303f03..4db66a2f0217d 100644 --- a/rust/datafusion/src/lib.rs +++ b/rust/datafusion/src/lib.rs @@ -36,6 +36,7 @@ pub mod logicalplan; pub mod optimizer; pub mod sql; pub mod table; +pub mod utils; #[cfg(test)] pub mod test; diff --git a/rust/datafusion/src/utils.rs b/rust/datafusion/src/utils.rs new file mode 100644 index 0000000000000..826d6e2362930 --- /dev/null +++ b/rust/datafusion/src/utils.rs @@ -0,0 +1,173 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Utilities for printing record batches + +use arrow::array; +use arrow::datatypes::{DataType, TimeUnit}; +use arrow::record_batch::RecordBatch; + +use prettytable::format; +use prettytable::{Cell, Row, Table}; + +use crate::error::{ExecutionError, Result}; + +///! Print a series of record batches to stdout +pub fn print_batches(results: &Vec) -> Result<()> { + create_table(results)?.printstd(); + Ok(()) +} + +///! Convert a series of record batches into a table +pub fn create_table(results: &Vec) -> Result { + let mut table = Table::new(); + table.set_format(*format::consts::FORMAT_NO_LINESEP_WITH_TITLE); + + if results.is_empty() { + return Ok(table); + } + + let schema = results[0].schema(); + + let mut header = Vec::new(); + for field in schema.fields() { + header.push(Cell::new(&field.name())); + } + table.set_titles(Row::new(header)); + + for batch in results { + for row in 0..batch.num_rows() { + let mut cells = Vec::new(); + for col in 0..batch.num_columns() { + let column = batch.column(col); + cells.push(Cell::new(&array_value_to_string(column.clone(), row)?)); + } + table.add_row(Row::new(cells)); + } + } + + Ok(table) +} + +macro_rules! make_string { + ($array_type:ty, $column: ident, $row: ident) => {{ + Ok($column + .as_any() + .downcast_ref::<$array_type>() + .unwrap() + .value($row) + .to_string()) + }}; +} + +/// Get the value at the given row in an array as a string +pub fn array_value_to_string(column: array::ArrayRef, row: usize) -> Result { + match column.data_type() { + DataType::Utf8 => Ok(column + .as_any() + .downcast_ref::() + .unwrap() + .value(row) + .to_string()), + DataType::Boolean => make_string!(array::BooleanArray, column, row), + DataType::Int16 => make_string!(array::Int16Array, column, row), + DataType::Int32 => make_string!(array::Int32Array, column, row), + DataType::Int64 => make_string!(array::Int64Array, column, row), + DataType::UInt8 => make_string!(array::UInt8Array, column, row), + DataType::UInt16 => make_string!(array::UInt16Array, column, row), + DataType::UInt32 => make_string!(array::UInt32Array, column, row), + DataType::UInt64 => make_string!(array::UInt64Array, column, row), + DataType::Float16 => make_string!(array::Float32Array, column, row), + DataType::Float32 => make_string!(array::Float32Array, column, row), + DataType::Float64 => make_string!(array::Float64Array, column, row), + DataType::Timestamp(unit, _) if *unit == TimeUnit::Second => { + make_string!(array::TimestampSecondArray, column, row) + } + DataType::Timestamp(unit, _) if *unit == TimeUnit::Millisecond => { + make_string!(array::TimestampMillisecondArray, column, row) + } + DataType::Timestamp(unit, _) if *unit == TimeUnit::Microsecond => { + make_string!(array::TimestampMicrosecondArray, column, row) + } + DataType::Timestamp(unit, _) if *unit == TimeUnit::Nanosecond => { + make_string!(array::TimestampNanosecondArray, column, row) + } + DataType::Date32(_) => make_string!(array::Date32Array, column, row), + DataType::Date64(_) => make_string!(array::Date64Array, column, row), + DataType::Time32(unit) if *unit == TimeUnit::Second => { + make_string!(array::Time32SecondArray, column, row) + } + DataType::Time32(unit) if *unit == TimeUnit::Millisecond => { + make_string!(array::Time32MillisecondArray, column, row) + } + DataType::Time32(unit) if *unit == TimeUnit::Microsecond => { + make_string!(array::Time64MicrosecondArray, column, row) + } + DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => { + make_string!(array::Time64NanosecondArray, column, row) + } + _ => Err(ExecutionError::ExecutionError(format!( + "Unsupported {:?} type for repl.", + column.data_type() + ))), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array; + use arrow::datatypes::{DataType, Field, Schema}; + use std::sync::Arc; + + #[test] + fn table() -> Result<()> { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Int32, false), + ])); + + // define data. + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(array::StringArray::from(vec!["a", "b", "c", "d"])), + Arc::new(array::Int32Array::from(vec![1, 10, 10, 100])), + ], + )?; + + let table = create_table(&vec![batch])?.to_string(); + + let expected = vec![ + "+---+-----+", + "| a | b |", + "+---+-----+", + "| a | 1 |", + "| b | 10 |", + "| c | 10 |", + "| d | 100 |", + "+---+-----+", + ]; + + let actual: Vec<&str> = table.lines().collect(); + + assert_eq!(expected, actual); + + Ok(()) + } +}