Skip to content

Commit

Permalink
fix binary array print formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
houqp committed Jan 18, 2022
1 parent 60e869e commit 61ace8f
Show file tree
Hide file tree
Showing 15 changed files with 194 additions and 36 deletions.
4 changes: 2 additions & 2 deletions datafusion-examples/examples/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::Arc;
use arrow::io::flight::deserialize_schemas;
use arrow_format::flight::data::{flight_descriptor, FlightDescriptor, Ticket};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use datafusion::arrow::io::print;
use datafusion::arrow_print;
use std::collections::HashMap;

/// This example shows how to wrap DataFusion with `FlightService` to support looking up schema information for
Expand Down Expand Up @@ -74,7 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}

// print the results
print::print(&results);
println!("{}", arrow_print::write(&result));

Ok(())
}
4 changes: 3 additions & 1 deletion datafusion/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,13 @@ rand = "0.8"
num-traits = { version = "0.2", optional = true }
pyo3 = { version = "0.14", optional = true }
avro-schema = { version = "0.2", optional = true }
# used to print arrow arrays in a nice columnar format
comfy-table = { version = "5.0", default-features = false }

[dependencies.arrow]
package = "arrow2"
version="0.8"
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "io_print", "ahash", "compute"]
features = ["io_csv", "io_json", "io_parquet", "io_parquet_compression", "io_ipc", "ahash", "compute"]

[dev-dependencies]
criterion = "0.3"
Expand Down
151 changes: 151 additions & 0 deletions datafusion/src/arrow_print.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Fork of arrow::io::print to implement custom Binary Array formatting logic.

// adapted from https://github.com/jorgecarleitao/arrow2/blob/ef7937dfe56033c2cc491482c67587b52cd91554/src/array/display.rs
// see: https://github.com/jorgecarleitao/arrow2/issues/771

use arrow::{array::*, record_batch::RecordBatch};

use comfy_table::{Cell, Table};

macro_rules! dyn_display {
($array:expr, $ty:ty, $expr:expr) => {{
let a = $array.as_any().downcast_ref::<$ty>().unwrap();
Box::new(move |row: usize| format!("{}", $expr(a.value(row))))
}};
}

fn df_get_array_value_display<'a>(
array: &'a dyn Array,
) -> Box<dyn Fn(usize) -> String + 'a> {
use arrow::datatypes::DataType::*;
match array.data_type() {
Binary => dyn_display!(array, BinaryArray<i32>, |x: &[u8]| {
x.iter().fold("".to_string(), |mut acc, x| {
acc.push_str(&format!("{:02x}", x));
acc
})
}),
LargeBinary => dyn_display!(array, BinaryArray<i64>, |x: &[u8]| {
x.iter().fold("".to_string(), |mut acc, x| {
acc.push_str(&format!("{:02x}", x));
acc
})
}),
List(_) => {
let f = |x: Box<dyn Array>| {
let display = df_get_array_value_display(x.as_ref());
let string_values = (0..x.len()).map(display).collect::<Vec<String>>();
format!("[{}]", string_values.join(", "))
};
dyn_display!(array, ListArray<i32>, f)
}
FixedSizeList(_, _) => {
let f = |x: Box<dyn Array>| {
let display = df_get_array_value_display(x.as_ref());
let string_values = (0..x.len()).map(display).collect::<Vec<String>>();
format!("[{}]", string_values.join(", "))
};
dyn_display!(array, FixedSizeListArray, f)
}
LargeList(_) => {
let f = |x: Box<dyn Array>| {
let display = df_get_array_value_display(x.as_ref());
let string_values = (0..x.len()).map(display).collect::<Vec<String>>();
format!("[{}]", string_values.join(", "))
};
dyn_display!(array, ListArray<i64>, f)
}
Struct(_) => {
let a = array.as_any().downcast_ref::<StructArray>().unwrap();
let displays = a
.values()
.iter()
.map(|x| df_get_array_value_display(x.as_ref()))
.collect::<Vec<_>>();
Box::new(move |row: usize| {
let mut string = displays
.iter()
.zip(a.fields().iter().map(|f| f.name()))
.map(|(f, name)| (f(row), name))
.fold("{".to_string(), |mut acc, (v, name)| {
acc.push_str(&format!("{}: {}, ", name, v));
acc
});
if string.len() > 1 {
// remove last ", "
string.pop();
string.pop();
}
string.push('}');
string
})
}
_ => get_display(array),
}
}

/// Returns a function of index returning the string representation of the item of `array`.
/// This outputs an empty string on nulls.
pub fn df_get_display<'a>(array: &'a dyn Array) -> Box<dyn Fn(usize) -> String + 'a> {
let value_display = df_get_array_value_display(array);
Box::new(move |row| {
if array.is_null(row) {
"".to_string()
} else {
value_display(row)
}
})
}

/// Convert a series of record batches into a String
pub fn write(results: &[RecordBatch]) -> String {
let mut table = Table::new();
table.load_preset("||--+-++| ++++++");

if results.is_empty() {
return table.to_string();
}

let schema = results[0].schema();

let mut header = Vec::new();
for field in schema.fields() {
header.push(Cell::new(field.name()));
}
table.set_header(header);

for batch in results {
let displayes = batch
.columns()
.iter()
.map(|array| df_get_display(array.as_ref()))
.collect::<Vec<_>>();

for row in 0..batch.num_rows() {
let mut cells = Vec::new();
(0..batch.num_columns()).for_each(|col| {
let string = displayes[col](row);
cells.push(Cell::new(&string));
});
table.add_row(cells);
}
}
table.to_string()
}
5 changes: 2 additions & 3 deletions datafusion/src/execution/dataframe_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};
use arrow::io::print;
use arrow::record_batch::RecordBatch;

use crate::physical_plan::{
Expand Down Expand Up @@ -168,14 +167,14 @@ impl DataFrame for DataFrameImpl {
/// Print results.
async fn show(&self) -> Result<()> {
let results = self.collect().await?;
print::print(&results);
print!("{}", crate::arrow_print::write(&results));
Ok(())
}

/// Print results and limit rows.
async fn show_limit(&self, num: usize) -> Result<()> {
let results = self.limit(num)?.collect().await?;
print::print(&results);
print!("{}", crate::arrow_print::write(&results));
Ok(())
}

Expand Down
5 changes: 3 additions & 2 deletions datafusion/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
//! let results: Vec<RecordBatch> = df.collect().await?;
//!
//! // format the results
//! let pretty_results = datafusion::arrow::io::print::write(&results);
//! let pretty_results = datafusion::arrow_print::write(&results);
//!
//! let expected = vec![
//! "+---+--------------------------+",
Expand Down Expand Up @@ -92,7 +92,7 @@
//! let results: Vec<RecordBatch> = df.collect().await?;
//!
//! // format the results
//! let pretty_results = datafusion::arrow::io::print::write(&results);
//! let pretty_results = datafusion::arrow_print::write(&results);
//!
//! let expected = vec![
//! "+---+----------------+",
Expand Down Expand Up @@ -229,6 +229,7 @@ pub mod variable;
pub use arrow;
pub use parquet;

pub mod arrow_print;
mod arrow_temporal_util;

pub mod field_util;
Expand Down
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl ExecutionPlan for CsvExec {
mod tests {
use super::*;
use crate::{
assert_batches_eq,
datasource::object_store::local::{local_unpartitioned_file, LocalFileSystem},
scalar::ScalarValue,
test_util::aggr_test_schema,
Expand Down Expand Up @@ -298,7 +299,7 @@ mod tests {
"+----+-----+------------+",
];

crate::assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
Ok(())
}

Expand Down Expand Up @@ -343,7 +344,7 @@ mod tests {
"+----+----+-----+--------+------------+----------------------+-----+-------+------------+----------------------+-------------+---------------------+--------------------------------+",
];

crate::assert_batches_eq!(expected, &[batch]);
assert_batches_eq!(expected, &[batch]);

Ok(())
}
Expand Down Expand Up @@ -396,7 +397,7 @@ mod tests {
"| b | 2021-10-26 |",
"+----+------------+",
];
crate::assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
assert_batches_eq!(expected, &[batch_slice(&batch, 0, 5)]);
Ok(())
}

Expand Down
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/file_format/file_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,7 @@ mod tests {

use super::*;
use crate::{
assert_batches_eq,
error::Result,
test::{make_partition, object_store::TestObjectStore},
};
Expand Down Expand Up @@ -230,7 +231,7 @@ mod tests {
let batches = create_and_collect(None).await;

#[rustfmt::skip]
crate::assert_batches_eq!(&[
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
Expand All @@ -254,7 +255,7 @@ mod tests {
async fn with_limit_between_files() -> Result<()> {
let batches = create_and_collect(Some(5)).await;
#[rustfmt::skip]
crate::assert_batches_eq!(&[
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
Expand All @@ -273,7 +274,7 @@ mod tests {
async fn with_limit_at_middle_of_batch() -> Result<()> {
let batches = create_and_collect(Some(6)).await;
#[rustfmt::skip]
crate::assert_batches_eq!(&[
assert_batches_eq!(&[
"+---+",
"| i |",
"+---+",
Expand Down
7 changes: 4 additions & 3 deletions datafusion/src/physical_plan/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ fn create_dict_array(
#[cfg(test)]
mod tests {
use crate::{
assert_batches_eq,
test::{build_table_i32, columns, object_store::TestObjectStore},
test_util::aggr_test_schema,
};
Expand Down Expand Up @@ -399,7 +400,7 @@ mod tests {
"| 2 | 0 | 12 | 2021 | 26 |",
"+---+----+----+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
assert_batches_eq!(expected, &[projected_batch]);

// project another batch that is larger than the previous one
let file_batch = build_table_i32(
Expand Down Expand Up @@ -429,7 +430,7 @@ mod tests {
"| 9 | -6 | 16 | 2021 | 27 |",
"+---+-----+----+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
assert_batches_eq!(expected, &[projected_batch]);

// project another batch that is smaller than the previous one
let file_batch = build_table_i32(
Expand Down Expand Up @@ -457,7 +458,7 @@ mod tests {
"| 3 | 4 | 6 | 2021 | 28 |",
"+---+---+---+------+-----+",
];
crate::assert_batches_eq!(expected, &[projected_batch]);
assert_batches_eq!(expected, &[projected_batch]);
}

// sets default for configs that play no role in projections
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ fn read_partition(

#[cfg(test)]
mod tests {
use crate::assert_batches_eq;
use crate::datasource::{
file_format::{parquet::ParquetFormat, FileFormat},
object_store::local::{
Expand Down Expand Up @@ -566,7 +567,7 @@ mod tests {
"| 1 | false | 1 | 10 |",
"+----+----------+-------------+-------+",
];
crate::assert_batches_eq!(expected, &[batch]);
assert_batches_eq!(expected, &[batch]);

let batch = results.next().await;
assert!(batch.is_none());
Expand Down
3 changes: 2 additions & 1 deletion datafusion/src/physical_plan/hash_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,10 +1023,11 @@ mod tests {
use futures::FutureExt;

use super::*;
use crate::assert_batches_sorted_eq;
use crate::physical_plan::common;
use crate::physical_plan::expressions::{col, Avg};
use crate::test::assert_is_pending;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::{assert_batches_sorted_eq, physical_plan::common};

use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec;

Expand Down
Loading

0 comments on commit 61ace8f

Please sign in to comment.