From 565465a02328875340d63046245637a3544ce2f1 Mon Sep 17 00:00:00 2001 From: Ilya Biryukov Date: Tue, 5 Oct 2021 16:58:57 +0300 Subject: [PATCH] fix(cubestore): fix parquet statistics for string columns The arrow code writes statistics with a different ordering. Tweak it in our fork to use lexicographical order. --- rust/Cargo.lock | 6 +- rust/cubestore/src/table/parquet.rs | 119 +++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 4 deletions(-) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 73aa2f42b65be..496e7aed672a5 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -138,7 +138,7 @@ checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" [[package]] name = "arrow" version = "5.0.0" -source = "git+https://github.com/cube-js/arrow-rs?branch=cube#0293429456f4464c7065075d909dce4269e477c4" +source = "git+https://github.com/cube-js/arrow-rs?branch=cube#f6a3075ab37239f655a982c2c15989d80d72a3db" dependencies = [ "bitflags", "chrono", @@ -161,7 +161,7 @@ dependencies = [ [[package]] name = "arrow-flight" version = "5.0.0" -source = "git+https://github.com/cube-js/arrow-rs?branch=cube#0293429456f4464c7065075d909dce4269e477c4" +source = "git+https://github.com/cube-js/arrow-rs?branch=cube#f6a3075ab37239f655a982c2c15989d80d72a3db" dependencies = [ "arrow", "base64 0.13.0", @@ -2884,7 +2884,7 @@ dependencies = [ [[package]] name = "parquet" version = "5.0.0" -source = "git+https://github.com/cube-js/arrow-rs?branch=cube#0293429456f4464c7065075d909dce4269e477c4" +source = "git+https://github.com/cube-js/arrow-rs?branch=cube#f6a3075ab37239f655a982c2c15989d80d72a3db" dependencies = [ "arrow", "base64 0.13.0", diff --git a/rust/cubestore/src/table/parquet.rs b/rust/cubestore/src/table/parquet.rs index 70cedad8af210..db7d4c877bdb2 100644 --- a/rust/cubestore/src/table/parquet.rs +++ b/rust/cubestore/src/table/parquet.rs @@ -77,11 +77,107 @@ mod tests { use crate::table::parquet::{arrow_schema, ParquetTableStore}; use crate::table::{Row, TableValue}; use crate::util::decimal::Decimal; - use arrow::array::BooleanArray; + use arrow::array::{ + ArrayRef, BooleanArray, Float64Array, Int64Array, Int64Decimal4Array, StringArray, + TimestampMicrosecondArray, + }; use arrow::record_batch::RecordBatch; + use itertools::Itertools; + use parquet::data_type::DataType; + use parquet::file::reader::FileReader; + use parquet::file::reader::SerializedFileReader; + use parquet::file::statistics::{Statistics, TypedStatistics}; + use pretty_assertions::assert_eq; use std::sync::Arc; use tempfile::NamedTempFile; + #[test] + fn column_statistics() { + let index = Index::try_new( + "table".to_string(), + 1, + vec![ + Column::new("str".to_string(), ColumnType::String, 0), + Column::new("int".to_string(), ColumnType::Int, 1), + Column::new("time".to_string(), ColumnType::Timestamp, 2), + Column::new( + "decimal".to_string(), + ColumnType::Decimal { + scale: 4, + precision: 5, + }, + 3, + ), + Column::new("float".to_string(), ColumnType::Float, 4), + Column::new("bool".to_string(), ColumnType::Boolean, 5), + ], + 6, + ) + .unwrap(); + + let dest_file = NamedTempFile::new().unwrap(); + let store = ParquetTableStore::new(index, ROW_GROUP_SIZE); + + let data: Vec = vec![ + Arc::new(StringArray::from(vec![ + Some("b"), + None, + Some("ab"), + Some("abc"), + ])), + Arc::new(Int64Array::from(vec![None, Some(3), Some(1), Some(2)])), + Arc::new(TimestampMicrosecondArray::from(vec![ + Some(6), + Some(4), + None, + Some(5), + ])), + Arc::new(Int64Decimal4Array::from(vec![ + Some(9), + Some(7), + Some(8), + None, + ])), + Arc::new(Float64Array::from(vec![ + Some(3.3), + None, + Some(1.1), + Some(2.2), + ])), + Arc::new(BooleanArray::from(vec![ + None, + Some(true), + Some(false), + Some(true), + ])), + ]; + // TODO: check floats use total_cmp. + + store + .write_data(dest_file.path().to_str().unwrap(), data) + .unwrap(); + + let r = SerializedFileReader::new(dest_file.into_file()).unwrap(); + + assert_eq!(r.num_row_groups(), 1); + let columns = r.metadata().row_group(0).columns(); + let columns = columns + .iter() + .map(|c| print_min_max(c.statistics())) + .join("\n"); + + assert_eq!( + columns, + // strings shown as byte arrays. 97, 98, 99 are codes for 'a', 'b', 'c'. + "min: [97, 98], max: [98]\ + \nmin: 1, max: 3\ + \nmin: 4, max: 6\ + \nmin: 7, max: 9\ + \nmin: 1.1, max: 3.3\ + \nmin: false, max: true" + ); + } + #[tokio::test] async fn gutter() { let store = ParquetTableStore { @@ -273,4 +369,25 @@ mod tests { let r = concat_record_batches(&w.read_columns(file).unwrap()); assert_eq_columns!(r.columns(), &data); } + + fn print_min_max_typed(s: &TypedStatistics) -> String { + format!("min: {}, max: {}", s.min(), s.max()) + } + + fn print_min_max(s: Option<&Statistics>) -> String { + let s = match s { + Some(s) => s, + None => return "".to_string(), + }; + match s { + Statistics::Boolean(t) => print_min_max_typed(t), + Statistics::Int32(t) => print_min_max_typed(t), + Statistics::Int64(t) => print_min_max_typed(t), + Statistics::Int96(t) => print_min_max_typed(t), + Statistics::Float(t) => print_min_max_typed(t), + Statistics::Double(t) => print_min_max_typed(t), + Statistics::ByteArray(t) => print_min_max_typed(t), + Statistics::FixedLenByteArray(t) => print_min_max_typed(t), + } + } }