Skip to content

Commit

Permalink
fix(cubestore): fix parquet statistics for string columns
Browse files Browse the repository at this point in the history
The arrow code writes statistics with a different ordering. Tweak it in
our fork to use lexicographical order.
  • Loading branch information
ilya-biryukov committed Oct 6, 2021
1 parent c1b2e10 commit 565465a
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 4 deletions.
6 changes: 3 additions & 3 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

119 changes: 118 additions & 1 deletion rust/cubestore/src/table/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,107 @@ mod tests {
use crate::table::parquet::{arrow_schema, ParquetTableStore};
use crate::table::{Row, TableValue};
use crate::util::decimal::Decimal;
use arrow::array::BooleanArray;
use arrow::array::{
ArrayRef, BooleanArray, Float64Array, Int64Array, Int64Decimal4Array, StringArray,
TimestampMicrosecondArray,
};
use arrow::record_batch::RecordBatch;
use itertools::Itertools;
use parquet::data_type::DataType;
use parquet::file::reader::FileReader;
use parquet::file::reader::SerializedFileReader;
use parquet::file::statistics::{Statistics, TypedStatistics};
use pretty_assertions::assert_eq;
use std::sync::Arc;
use tempfile::NamedTempFile;

#[test]
fn column_statistics() {
let index = Index::try_new(
"table".to_string(),
1,
vec![
Column::new("str".to_string(), ColumnType::String, 0),
Column::new("int".to_string(), ColumnType::Int, 1),
Column::new("time".to_string(), ColumnType::Timestamp, 2),
Column::new(
"decimal".to_string(),
ColumnType::Decimal {
scale: 4,
precision: 5,
},
3,
),
Column::new("float".to_string(), ColumnType::Float, 4),
Column::new("bool".to_string(), ColumnType::Boolean, 5),
],
6,
)
.unwrap();

let dest_file = NamedTempFile::new().unwrap();
let store = ParquetTableStore::new(index, ROW_GROUP_SIZE);

let data: Vec<ArrayRef> = vec![
Arc::new(StringArray::from(vec![
Some("b"),
None,
Some("ab"),
Some("abc"),
])),
Arc::new(Int64Array::from(vec![None, Some(3), Some(1), Some(2)])),
Arc::new(TimestampMicrosecondArray::from(vec![
Some(6),
Some(4),
None,
Some(5),
])),
Arc::new(Int64Decimal4Array::from(vec![
Some(9),
Some(7),
Some(8),
None,
])),
Arc::new(Float64Array::from(vec![
Some(3.3),
None,
Some(1.1),
Some(2.2),
])),
Arc::new(BooleanArray::from(vec![
None,
Some(true),
Some(false),
Some(true),
])),
];
// TODO: check floats use total_cmp.

store
.write_data(dest_file.path().to_str().unwrap(), data)
.unwrap();

let r = SerializedFileReader::new(dest_file.into_file()).unwrap();

assert_eq!(r.num_row_groups(), 1);
let columns = r.metadata().row_group(0).columns();
let columns = columns
.iter()
.map(|c| print_min_max(c.statistics()))
.join("\n");

assert_eq!(
columns,
// strings shown as byte arrays. 97, 98, 99 are codes for 'a', 'b', 'c'.
"min: [97, 98], max: [98]\
\nmin: 1, max: 3\
\nmin: 4, max: 6\
\nmin: 7, max: 9\
\nmin: 1.1, max: 3.3\
\nmin: false, max: true"
);
}

#[tokio::test]
async fn gutter() {
let store = ParquetTableStore {
Expand Down Expand Up @@ -273,4 +369,25 @@ mod tests {
let r = concat_record_batches(&w.read_columns(file).unwrap());
assert_eq_columns!(r.columns(), &data);
}

fn print_min_max_typed<T: DataType>(s: &TypedStatistics<T>) -> String {
format!("min: {}, max: {}", s.min(), s.max())
}

fn print_min_max(s: Option<&Statistics>) -> String {
let s = match s {
Some(s) => s,
None => return "<null>".to_string(),
};
match s {
Statistics::Boolean(t) => print_min_max_typed(t),
Statistics::Int32(t) => print_min_max_typed(t),
Statistics::Int64(t) => print_min_max_typed(t),
Statistics::Int96(t) => print_min_max_typed(t),
Statistics::Float(t) => print_min_max_typed(t),
Statistics::Double(t) => print_min_max_typed(t),
Statistics::ByteArray(t) => print_min_max_typed(t),
Statistics::FixedLenByteArray(t) => print_min_max_typed(t),
}
}
}

0 comments on commit 565465a

Please sign in to comment.