From 34a816d4fc2a9e0e097b1c41df788612dd5c8e61 Mon Sep 17 00:00:00 2001 From: Jeffrey <22608443+Jefffrey@users.noreply.github.com> Date: Tue, 28 Nov 2023 22:18:42 +1100 Subject: [PATCH] Parquet: derive boundary order when writing (#5110) * Parquet: derive boundary order when writing * Fix * Refactor boundary check location * Fix * Refactor according to review --- parquet/src/column/writer/mod.rs | 239 ++++++++++++++++++++++++++++--- parquet/src/file/metadata.rs | 9 +- 2 files changed, 222 insertions(+), 26 deletions(-) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 11c39685911..14b8655091e 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use half::f16; use crate::bloom_filter::Sbbf; -use crate::format::{ColumnIndex, OffsetIndex}; +use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex}; use std::collections::{BTreeSet, VecDeque}; use std::str; @@ -228,6 +228,13 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> { // column index and offset index column_index_builder: ColumnIndexBuilder, offset_index_builder: OffsetIndexBuilder, + + // Below fields used to incrementally check boundary order across data pages. + // We assume they are ascending/descending until proven wrong. + data_page_boundary_ascending: bool, + data_page_boundary_descending: bool, + /// (min, max) + last_non_null_data_page_min_max: Option<(E::T, E::T)>, } impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { @@ -279,6 +286,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { column_index_builder: ColumnIndexBuilder::new(), offset_index_builder: OffsetIndexBuilder::new(), encodings, + data_page_boundary_ascending: true, + data_page_boundary_descending: true, + last_non_null_data_page_min_max: None, } } @@ -467,6 +477,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { let metadata = self.write_column_metadata()?; self.page_writer.close()?; + let boundary_order = match ( + self.data_page_boundary_ascending, + self.data_page_boundary_descending, + ) { + // If the lists are composed of equal elements then will be marked as ascending + // (Also the case if all pages are null pages) + (true, _) => BoundaryOrder::ASCENDING, + (false, true) => BoundaryOrder::DESCENDING, + (false, false) => BoundaryOrder::UNORDERED, + }; + self.column_index_builder.set_boundary_order(boundary_order); + let column_index = self .column_index_builder .valid() @@ -610,7 +632,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } /// Update the column index and offset index when adding the data page - fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) { + fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics>) { // update the column index let null_page = (self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls; @@ -631,6 +653,30 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { self.column_index_builder.to_invalid(); } Some(stat) => { + // Check if min/max are still ascending/descending across pages + let new_min = stat.min(); + let new_max = stat.max(); + if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max { + if self.data_page_boundary_ascending { + // If last min/max are greater than new min/max then not ascending anymore + let not_ascending = compare_greater(&self.descr, last_min, new_min) + || compare_greater(&self.descr, last_max, new_max); + if not_ascending { + self.data_page_boundary_ascending = false; + } + } + + if self.data_page_boundary_descending { + // If new min/max are greater than last min/max then not descending anymore + let not_descending = compare_greater(&self.descr, new_min, last_min) + || compare_greater(&self.descr, new_max, last_max); + if not_descending { + self.data_page_boundary_descending = false; + } + } + } + self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone())); + // We only truncate if the data is represented as binary match self.descr.physical_type() { Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => { @@ -703,7 +749,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { (Some(min), Some(max)) => { update_min(&self.descr, &min, &mut self.column_metrics.min_column_value); update_max(&self.descr, &max, &mut self.column_metrics.max_column_value); - Some(Statistics::new( + Some(ValueStatistics::new( Some(min), Some(max), None, @@ -716,6 +762,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { // update column and offset index self.update_column_offset_index(page_statistics.as_ref()); + let page_statistics = page_statistics.map(Statistics::from); let compressed_page = match self.props.writer_version() { WriterVersion::PARQUET_1_0 => { @@ -2569,7 +2616,7 @@ mod tests { // column index assert_eq!(1, column_index.null_pages.len()); assert_eq!(1, offset_index.page_locations.len()); - assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order); + assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order); assert!(!column_index.null_pages[0]); assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]); @@ -2636,7 +2683,7 @@ mod tests { // column index assert_eq!(1, column_index.null_pages.len()); assert_eq!(1, offset_index.page_locations.len()); - assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order); + assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order); assert!(!column_index.null_pages[0]); assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]); @@ -2891,6 +2938,158 @@ mod tests { assert!(incremented.is_none()) } + #[test] + fn test_boundary_order() -> Result<()> { + let descr = Arc::new(get_test_column_descr::(1, 0)); + // min max both ascending + let column_close_result = write_multiple_pages::( + &descr, + &[ + &[Some(-10), Some(10)], + &[Some(-5), Some(11)], + &[None], + &[Some(-5), Some(11)], + ], + )?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + + // min max both descending + let column_close_result = write_multiple_pages::( + &descr, + &[ + &[Some(10), Some(11)], + &[Some(5), Some(11)], + &[None], + &[Some(-5), Some(0)], + ], + )?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::DESCENDING); + + // min max both equal + let column_close_result = write_multiple_pages::( + &descr, + &[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]], + )?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + + // only nulls + let column_close_result = + write_multiple_pages::(&descr, &[&[None], &[None], &[None]])?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + + // one page + let column_close_result = + write_multiple_pages::(&descr, &[&[Some(-10), Some(10)]])?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + + // one non-null page + let column_close_result = + write_multiple_pages::(&descr, &[&[Some(-10), Some(10)], &[None]])?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::ASCENDING); + + // min max both unordered + let column_close_result = write_multiple_pages::( + &descr, + &[ + &[Some(10), Some(11)], + &[Some(11), Some(16)], + &[None], + &[Some(-5), Some(0)], + ], + )?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::UNORDERED); + + // min max both ordered in different orders + let column_close_result = write_multiple_pages::( + &descr, + &[ + &[Some(1), Some(9)], + &[Some(2), Some(8)], + &[None], + &[Some(3), Some(7)], + ], + )?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::UNORDERED); + + Ok(()) + } + + #[test] + fn test_boundary_order_logical_type() -> Result<()> { + // ensure that logical types account for different sort order than underlying + // physical type representation + let f16_descr = Arc::new(get_test_float16_column_descr(1, 0)); + let fba_descr = { + let tpe = SchemaType::primitive_type_builder( + "col", + FixedLenByteArrayType::get_physical_type(), + ) + .with_length(2) + .build()?; + Arc::new(ColumnDescriptor::new( + Arc::new(tpe), + 1, + 0, + ColumnPath::from("col"), + )) + }; + + let values: &[&[Option]] = &[ + &[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))], + &[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))], + &[Some(FixedLenByteArray::from(ByteArray::from( + f16::NEG_ZERO, + )))], + &[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))], + ]; + + // f16 descending + let column_close_result = + write_multiple_pages::(&f16_descr, values)?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::DESCENDING); + + // same bytes, but fba unordered + let column_close_result = + write_multiple_pages::(&fba_descr, values)?; + let boundary_order = column_close_result.column_index.unwrap().boundary_order; + assert_eq!(boundary_order, BoundaryOrder::UNORDERED); + + Ok(()) + } + + fn write_multiple_pages( + column_descr: &Arc, + pages: &[&[Option]], + ) -> Result { + let column_writer = get_column_writer( + column_descr.clone(), + Default::default(), + get_test_page_writer(), + ); + let mut writer = get_typed_column_writer::(column_writer); + + for &page in pages { + let values = page.iter().filter_map(Clone::clone).collect::>(); + let def_levels = page + .iter() + .map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 }) + .collect::>(); + writer.write_batch(&values, Some(&def_levels), None)?; + writer.flush_data_pages()?; + } + + writer.close() + } + /// Performs write-read roundtrip with randomly generated values and levels. /// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write /// for a column. @@ -3197,8 +3396,7 @@ mod tests { ) -> ValueStatistics { let page_writer = get_test_page_writer(); let props = Default::default(); - let mut writer = - get_test_float16_column_writer::(page_writer, 0, 0, props); + let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props); writer.write_batch(values, None, None).unwrap(); let metadata = writer.close().unwrap().metadata; @@ -3209,30 +3407,25 @@ mod tests { } } - fn get_test_float16_column_writer( + fn get_test_float16_column_writer( page_writer: Box, max_def_level: i16, max_rep_level: i16, props: WriterPropertiesPtr, - ) -> ColumnWriterImpl<'static, T> { - let descr = Arc::new(get_test_float16_column_descr::( - max_def_level, - max_rep_level, - )); + ) -> ColumnWriterImpl<'static, FixedLenByteArrayType> { + let descr = Arc::new(get_test_float16_column_descr(max_def_level, max_rep_level)); let column_writer = get_column_writer(descr, props, page_writer); - get_typed_column_writer::(column_writer) + get_typed_column_writer::(column_writer) } - fn get_test_float16_column_descr( - max_def_level: i16, - max_rep_level: i16, - ) -> ColumnDescriptor { + fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor { let path = ColumnPath::from("col"); - let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type()) - .with_length(2) - .with_logical_type(Some(LogicalType::Float16)) - .build() - .unwrap(); + let tpe = + SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type()) + .with_length(2) + .with_logical_type(Some(LogicalType::Float16)) + .build() + .unwrap(); ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path) } diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs index e57f666383d..a1f3c87d0a7 100644 --- a/parquet/src/file/metadata.rs +++ b/parquet/src/file/metadata.rs @@ -885,9 +885,8 @@ pub struct ColumnIndexBuilder { null_pages: Vec, min_values: Vec>, max_values: Vec>, - // TODO: calc the order for all pages in this column - boundary_order: BoundaryOrder, null_counts: Vec, + boundary_order: BoundaryOrder, // If one page can't get build index, need to ignore all index in this column valid: bool, } @@ -904,8 +903,8 @@ impl ColumnIndexBuilder { null_pages: Vec::new(), min_values: Vec::new(), max_values: Vec::new(), - boundary_order: BoundaryOrder::UNORDERED, null_counts: Vec::new(), + boundary_order: BoundaryOrder::UNORDERED, valid: true, } } @@ -923,6 +922,10 @@ impl ColumnIndexBuilder { self.null_counts.push(null_count); } + pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) { + self.boundary_order = boundary_order; + } + pub fn to_invalid(&mut self) { self.valid = false; }