Skip to content

Commit

Permalink
ARROW-8264: [Rust] [DataFusion] Add utility for printing batches
Browse files Browse the repository at this point in the history
This PR adds a utility for printing batches and updates the examples to call this code. This makes the examples more concise and the output more consistent.

The code was previously in the CLI but is now in the new utils module.

```
$ cargo run --example csv_sql
   Compiling datafusion v1.0.0-SNAPSHOT (/home/andy/git/andygrove/arrow/rust/datafusion)
    Finished dev [unoptimized + debuginfo] target(s) in 4.16s
     Running `target/debug/examples/csv_sql`
+----+----------------------+--------------------+
| c1 | MIN                  | MAX                |
+----+----------------------+--------------------+
| a  | 0.02182578039211991  | 0.9567595541247681 |
| e  | 0.01479305307777301  | 0.9965400387585364 |
| d  | 0.061029375346466685 | 0.9748360509016578 |
| c  | 0.2667177795079635   | 0.991517828651004  |
| b  | 0.04893135681998029  | 0.9185813970744787 |
+----+----------------------+--------------------+

```

Closes #6754 from andygrove/datafusion-utils

Authored-by: Andy Grove <andygrove73@gmail.com>
Signed-off-by: Andy Grove <andygrove73@gmail.com>
  • Loading branch information
andygrove committed Mar 31, 2020
1 parent 895f220 commit 7c7014b
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 212 deletions.
48 changes: 5 additions & 43 deletions rust/datafusion/examples/csv_sql.rs
Expand Up @@ -15,16 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

extern crate arrow;
extern crate datafusion;

use arrow::array::{Float64Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};

use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::utils;

/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
/// fetching results
Expand All @@ -33,7 +28,7 @@ fn main() -> Result<()> {
let mut ctx = ExecutionContext::new();

// define schema for data source (csv file)
let schema = Arc::new(Schema::new(vec![
let schema = Schema::new(vec![
Field::new("c1", DataType::Utf8, false),
Field::new("c2", DataType::UInt32, false),
Field::new("c3", DataType::Int8, false),
Expand All @@ -47,7 +42,7 @@ fn main() -> Result<()> {
Field::new("c11", DataType::Float32, false),
Field::new("c12", DataType::Float64, false),
Field::new("c13", DataType::Utf8, false),
]));
]);

let testdata = std::env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");

Expand All @@ -69,41 +64,8 @@ fn main() -> Result<()> {
// execute the query
let results = ctx.collect(plan.as_ref())?;

// iterate over the results
results.iter().for_each(|batch| {
println!(
"RecordBatch has {} rows and {} columns",
batch.num_rows(),
batch.num_columns()
);

let c1 = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

let min = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();

let max = batch
.column(2)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();

for i in 0..batch.num_rows() {
println!(
"{}, Min: {}, Max: {}",
c1.value(i),
min.value(i),
max.value(i),
);
}
});
// print the results
utils::print_batches(&results)?;

Ok(())
}
Expand Up @@ -18,9 +18,11 @@
use std::convert::TryFrom;
use std::sync::Arc;

use arrow::array::Int32Array;
use arrow::datatypes::Schema;
use arrow::flight::flight_data_to_batch;

use datafusion::utils;

use flight::flight_descriptor;
use flight::flight_service_client::FlightServiceClient;
use flight::{FlightDescriptor, Ticket};
Expand Down Expand Up @@ -58,22 +60,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Schema: {:?}", schema);

// all the remaining stream messages should be dictionary and record batches
let mut results = vec![];
while let Some(flight_data) = stream.message().await? {
// the unwrap is infallible and thus safe
let record_batch = flight_data_to_batch(&flight_data, schema.clone())?.unwrap();

println!(
"record_batch has {} columns and {} rows",
record_batch.num_columns(),
record_batch.num_rows()
);
let column = record_batch.column(0);
let column = column
.as_any()
.downcast_ref::<Int32Array>()
.expect("Unable to get column");
println!("Column 1: {:?}", column);
results.push(record_batch);
}

// print the results
utils::print_batches(&results).unwrap();

Ok(())
}
File renamed without changes.
30 changes: 3 additions & 27 deletions rust/datafusion/examples/memory_table_api.rs
Expand Up @@ -18,9 +18,6 @@
use std::boxed::Box;
use std::sync::Arc;

extern crate arrow;
extern crate datafusion;

use arrow::array::{Int32Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
Expand All @@ -29,6 +26,7 @@ use datafusion::datasource::MemTable;
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::logicalplan::{Expr, ScalarValue};
use datafusion::utils;

/// This example demonstrates basic uses of the Table API on an in-memory table
fn main() -> Result<()> {
Expand Down Expand Up @@ -63,30 +61,8 @@ fn main() -> Result<()> {
// execute
let results = t.collect(&mut ctx, 10)?;

// print results
results.iter().for_each(|batch| {
println!(
"RecordBatch has {} rows and {} columns",
batch.num_rows(),
batch.num_columns()
);

let c1 = batch
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.expect("String type");

let c2 = batch
.column(1)
.as_any()
.downcast_ref::<Int32Array>()
.expect("Int type");

for i in 0..batch.num_rows() {
println!("{}, {}", c1.value(i), c2.value(i),);
}
});
// print the results
utils::print_batches(&results)?;

Ok(())
}
43 changes: 3 additions & 40 deletions rust/datafusion/examples/parquet_sql.rs
Expand Up @@ -15,13 +15,9 @@
// specific language governing permissions and limitations
// under the License.

extern crate arrow;
extern crate datafusion;

use arrow::array::{Float64Array, Int32Array, StringArray};

use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use datafusion::utils;

/// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
/// fetching results
Expand Down Expand Up @@ -49,41 +45,8 @@ fn main() -> Result<()> {
// execute the query
let results = ctx.collect(plan.as_ref())?;

// iterate over the results
results.iter().for_each(|batch| {
println!(
"RecordBatch has {} rows and {} columns",
batch.num_rows(),
batch.num_columns()
);

let int = batch
.column(0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();

let double = batch
.column(1)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();

let date = batch
.column(2)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();

for i in 0..batch.num_rows() {
println!(
"Date: {}, Int: {}, Double: {}",
date.value(i),
int.value(i),
double.value(i)
);
}
});
// print the results
utils::print_batches(&results)?;

Ok(())
}
93 changes: 4 additions & 89 deletions rust/datafusion/src/bin/repl.rs
Expand Up @@ -17,12 +17,10 @@

#![allow(bare_trait_objects)]

use arrow::array::*;
use arrow::datatypes::{DataType, TimeUnit};
use clap::{crate_version, App, Arg};
use datafusion::error::{ExecutionError, Result};
use datafusion::error::Result;
use datafusion::execution::context::ExecutionContext;
use prettytable::{Cell, Row, Table};
use datafusion::utils;
use rustyline::Editor;
use std::env;
use std::path::Path;
Expand Down Expand Up @@ -118,29 +116,9 @@ fn exec_and_print(
return Ok(());
}

let mut row_count = 0;
let mut table = Table::new();
let schema = results[0].schema();
utils::print_batches(&results)?;

let mut header = Vec::new();
for field in schema.fields() {
header.push(Cell::new(&field.name()));
}
table.add_row(Row::new(header));

for batch in results {
row_count += batch.num_rows();

for row in 0..batch.num_rows() {
let mut cells = Vec::new();
for col in 0..batch.num_columns() {
let column = batch.column(col);
cells.push(Cell::new(&str_value(column.clone(), row)?));
}
table.add_row(Row::new(cells));
}
}
table.printstd();
let row_count: usize = results.iter().map(|b| b.num_rows()).sum();

if row_count > 1 {
println!(
Expand All @@ -158,66 +136,3 @@ fn exec_and_print(

Ok(())
}

macro_rules! make_string {
($array_type:ty, $column: ident, $row: ident) => {{
Ok($column
.as_any()
.downcast_ref::<$array_type>()
.unwrap()
.value($row)
.to_string())
}};
}

fn str_value(column: ArrayRef, row: usize) -> Result<String> {
match column.data_type() {
DataType::Utf8 => Ok(column
.as_any()
.downcast_ref::<StringArray>()
.unwrap()
.value(row)
.to_string()),
DataType::Boolean => make_string!(BooleanArray, column, row),
DataType::Int16 => make_string!(Int16Array, column, row),
DataType::Int32 => make_string!(Int32Array, column, row),
DataType::Int64 => make_string!(Int64Array, column, row),
DataType::UInt8 => make_string!(UInt8Array, column, row),
DataType::UInt16 => make_string!(UInt16Array, column, row),
DataType::UInt32 => make_string!(UInt32Array, column, row),
DataType::UInt64 => make_string!(UInt64Array, column, row),
DataType::Float16 => make_string!(Float32Array, column, row),
DataType::Float32 => make_string!(Float32Array, column, row),
DataType::Float64 => make_string!(Float64Array, column, row),
DataType::Timestamp(unit, _) if *unit == TimeUnit::Second => {
make_string!(TimestampSecondArray, column, row)
}
DataType::Timestamp(unit, _) if *unit == TimeUnit::Millisecond => {
make_string!(TimestampMillisecondArray, column, row)
}
DataType::Timestamp(unit, _) if *unit == TimeUnit::Microsecond => {
make_string!(TimestampMicrosecondArray, column, row)
}
DataType::Timestamp(unit, _) if *unit == TimeUnit::Nanosecond => {
make_string!(TimestampNanosecondArray, column, row)
}
DataType::Date32(_) => make_string!(Date32Array, column, row),
DataType::Date64(_) => make_string!(Date64Array, column, row),
DataType::Time32(unit) if *unit == TimeUnit::Second => {
make_string!(Time32SecondArray, column, row)
}
DataType::Time32(unit) if *unit == TimeUnit::Millisecond => {
make_string!(Time32MillisecondArray, column, row)
}
DataType::Time32(unit) if *unit == TimeUnit::Microsecond => {
make_string!(Time64MicrosecondArray, column, row)
}
DataType::Time64(unit) if *unit == TimeUnit::Nanosecond => {
make_string!(Time64NanosecondArray, column, row)
}
_ => Err(ExecutionError::ExecutionError(format!(
"Unsupported {:?} type for repl.",
column.data_type()
))),
}
}
1 change: 1 addition & 0 deletions rust/datafusion/src/lib.rs
Expand Up @@ -36,6 +36,7 @@ pub mod logicalplan;
pub mod optimizer;
pub mod sql;
pub mod table;
pub mod utils;

#[cfg(test)]
pub mod test;

0 comments on commit 7c7014b

Please sign in to comment.