Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: derive boundary order when writing #5110

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
239 changes: 216 additions & 23 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;
use half::f16;

use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
use std::str;

Expand Down Expand Up @@ -228,6 +228,13 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,

// Below fields used to incrementally check boundary order across data pages.
// We assume they are ascending/descending until proven wrong.
data_page_boundary_ascending: bool,
data_page_boundary_descending: bool,
/// (min, max)
last_non_null_data_page_min_max: Option<(E::T, E::T)>,
}

impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Expand Down Expand Up @@ -279,6 +286,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
last_non_null_data_page_min_max: None,
}
}

Expand Down Expand Up @@ -467,6 +477,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let metadata = self.write_column_metadata()?;
self.page_writer.close()?;

let boundary_order = match (
self.data_page_boundary_ascending,
self.data_page_boundary_descending,
) {
// If the lists are composed of equal elements then will be marked as ascending
// (Also the case if all pages are null pages)
(true, _) => BoundaryOrder::ASCENDING,
(false, true) => BoundaryOrder::DESCENDING,
(false, false) => BoundaryOrder::UNORDERED,
};
self.column_index_builder.set_boundary_order(boundary_order);

let column_index = self
.column_index_builder
.valid()
Expand Down Expand Up @@ -610,7 +632,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) {
fn update_column_offset_index(&mut self, page_statistics: Option<&ValueStatistics<E::T>>) {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
Expand All @@ -631,6 +653,30 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.column_index_builder.to_invalid();
}
Some(stat) => {
// Check if min/max are still ascending/descending across pages
let new_min = stat.min();
let new_max = stat.max();
if let Some((last_min, last_max)) = &self.last_non_null_data_page_min_max {
if self.data_page_boundary_ascending {
// If last min/max are greater than new min/max then not ascending anymore
let not_ascending = compare_greater(&self.descr, last_min, new_min)
|| compare_greater(&self.descr, last_max, new_max);
if not_ascending {
self.data_page_boundary_ascending = false;
}
}

if self.data_page_boundary_descending {
// If new min/max are greater than last min/max then not descending anymore
let not_descending = compare_greater(&self.descr, new_min, last_min)
|| compare_greater(&self.descr, new_max, last_max);
if not_descending {
self.data_page_boundary_descending = false;
}
}
}
self.last_non_null_data_page_min_max = Some((new_min.clone(), new_max.clone()));

// We only truncate if the data is represented as binary
match self.descr.physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
Expand Down Expand Up @@ -703,7 +749,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
(Some(min), Some(max)) => {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);
Some(Statistics::new(
Some(ValueStatistics::new(
Some(min),
Some(max),
None,
Expand All @@ -716,6 +762,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref());
let page_statistics = page_statistics.map(Statistics::from);

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down Expand Up @@ -2569,7 +2616,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down Expand Up @@ -2636,7 +2683,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down Expand Up @@ -2891,6 +2938,158 @@ mod tests {
assert!(incremented.is_none())
}

#[test]
fn test_boundary_order() -> Result<()> {
let descr = Arc::new(get_test_column_descr::<Int32Type>(1, 0));
// min max both ascending
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(-10), Some(10)],
&[Some(-5), Some(11)],
&[None],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there some documentation of this behaviour in the parquet spec, or failing that an example implementation doing similar. Normally I would have expected a null to break the ordering as it would break the ability to do binary search

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is this example test in arrow c++ that shows null pages doesn't have an effect on ordering:

https://github.com/apache/arrow/blob/862792132297f0ca519c83e524e59c7d685298e8/cpp/src/parquet/arrow/arrow_reader_writer_test.cc#L5597-L5610

Same here:

https://github.com/apache/arrow/blob/84c15da1997559c37841dc16f9e2c70c643dd9d2/cpp/src/parquet/page_index_test.cc#L567-L581

Though I do agree that the Parquet spec is lacking explicit documentation on how null pages are handled

&[Some(-5), Some(11)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// min max both descending
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(10), Some(11)],
&[Some(5), Some(11)],
&[None],
&[Some(-5), Some(0)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::DESCENDING);

// min max both equal
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[&[Some(10), Some(11)], &[None], &[Some(10), Some(11)]],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// only nulls
let column_close_result =
write_multiple_pages::<Int32Type>(&descr, &[&[None], &[None], &[None]])?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// one page
let column_close_result =
write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)]])?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// one non-null page
let column_close_result =
write_multiple_pages::<Int32Type>(&descr, &[&[Some(-10), Some(10)], &[None]])?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::ASCENDING);

// min max both unordered
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(10), Some(11)],
&[Some(11), Some(16)],
&[None],
&[Some(-5), Some(0)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::UNORDERED);

// min max both ordered in different orders
let column_close_result = write_multiple_pages::<Int32Type>(
&descr,
&[
&[Some(1), Some(9)],
&[Some(2), Some(8)],
&[None],
&[Some(3), Some(7)],
],
)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::UNORDERED);

Ok(())
}

#[test]
fn test_boundary_order_logical_type() -> Result<()> {
// ensure that logical types account for different sort order than underlying
// physical type representation
let f16_descr = Arc::new(get_test_float16_column_descr(1, 0));
let fba_descr = {
let tpe = SchemaType::primitive_type_builder(
"col",
FixedLenByteArrayType::get_physical_type(),
)
.with_length(2)
.build()?;
Arc::new(ColumnDescriptor::new(
Arc::new(tpe),
1,
0,
ColumnPath::from("col"),
))
};

let values: &[&[Option<FixedLenByteArray>]] = &[
&[Some(FixedLenByteArray::from(ByteArray::from(f16::ONE)))],
&[Some(FixedLenByteArray::from(ByteArray::from(f16::ZERO)))],
&[Some(FixedLenByteArray::from(ByteArray::from(
f16::NEG_ZERO,
)))],
&[Some(FixedLenByteArray::from(ByteArray::from(f16::NEG_ONE)))],
];

// f16 descending
let column_close_result =
write_multiple_pages::<FixedLenByteArrayType>(&f16_descr, values)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::DESCENDING);

// same bytes, but fba unordered
let column_close_result =
write_multiple_pages::<FixedLenByteArrayType>(&fba_descr, values)?;
let boundary_order = column_close_result.column_index.unwrap().boundary_order;
assert_eq!(boundary_order, BoundaryOrder::UNORDERED);

Ok(())
}

fn write_multiple_pages<T: DataType>(
column_descr: &Arc<ColumnDescriptor>,
pages: &[&[Option<T::T>]],
) -> Result<ColumnCloseResult> {
let column_writer = get_column_writer(
column_descr.clone(),
Default::default(),
get_test_page_writer(),
);
let mut writer = get_typed_column_writer::<T>(column_writer);

for &page in pages {
let values = page.iter().filter_map(Clone::clone).collect::<Vec<_>>();
let def_levels = page
.iter()
.map(|maybe_value| if maybe_value.is_some() { 1 } else { 0 })
.collect::<Vec<_>>();
writer.write_batch(&values, Some(&def_levels), None)?;
writer.flush_data_pages()?;
}

writer.close()
}

/// Performs write-read roundtrip with randomly generated values and levels.
/// `max_size` is maximum number of values or levels (if `max_def_level` > 0) to write
/// for a column.
Expand Down Expand Up @@ -3197,8 +3396,7 @@ mod tests {
) -> ValueStatistics<FixedLenByteArray> {
let page_writer = get_test_page_writer();
let props = Default::default();
let mut writer =
get_test_float16_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);
let mut writer = get_test_float16_column_writer(page_writer, 0, 0, props);
writer.write_batch(values, None, None).unwrap();

let metadata = writer.close().unwrap().metadata;
Expand All @@ -3209,30 +3407,25 @@ mod tests {
}
}

fn get_test_float16_column_writer<T: DataType>(
fn get_test_float16_column_writer(
page_writer: Box<dyn PageWriter>,
max_def_level: i16,
max_rep_level: i16,
props: WriterPropertiesPtr,
) -> ColumnWriterImpl<'static, T> {
let descr = Arc::new(get_test_float16_column_descr::<T>(
max_def_level,
max_rep_level,
));
) -> ColumnWriterImpl<'static, FixedLenByteArrayType> {
let descr = Arc::new(get_test_float16_column_descr(max_def_level, max_rep_level));
let column_writer = get_column_writer(descr, props, page_writer);
get_typed_column_writer::<T>(column_writer)
get_typed_column_writer::<FixedLenByteArrayType>(column_writer)
}

fn get_test_float16_column_descr<T: DataType>(
max_def_level: i16,
max_rep_level: i16,
) -> ColumnDescriptor {
fn get_test_float16_column_descr(max_def_level: i16, max_rep_level: i16) -> ColumnDescriptor {
let path = ColumnPath::from("col");
let tpe = SchemaType::primitive_type_builder("col", T::get_physical_type())
.with_length(2)
.with_logical_type(Some(LogicalType::Float16))
.build()
.unwrap();
let tpe =
SchemaType::primitive_type_builder("col", FixedLenByteArrayType::get_physical_type())
.with_length(2)
.with_logical_type(Some(LogicalType::Float16))
.build()
.unwrap();
ColumnDescriptor::new(Arc::new(tpe), max_def_level, max_rep_level, path)
}

Expand Down
9 changes: 6 additions & 3 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,8 @@ pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
// TODO: calc the order for all pages in this column
boundary_order: BoundaryOrder,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
// If one page can't get build index, need to ignore all index in this column
valid: bool,
}
Expand All @@ -904,8 +903,8 @@ impl ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
valid: true,
}
}
Expand All @@ -923,6 +922,10 @@ impl ColumnIndexBuilder {
self.null_counts.push(null_count);
}

pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}

pub fn to_invalid(&mut self) {
self.valid = false;
}
Expand Down