Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: derive boundary order when writing #5110

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use bytes::Bytes;
use half::f16;

use crate::bloom_filter::Sbbf;
use crate::format::{ColumnIndex, OffsetIndex};
use crate::format::{BoundaryOrder, ColumnIndex, OffsetIndex};
use std::collections::{BTreeSet, VecDeque};
use std::str;

Expand Down Expand Up @@ -228,6 +228,12 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
// column index and offset index
column_index_builder: ColumnIndexBuilder,
offset_index_builder: OffsetIndexBuilder,
// Below fields used to incrementally check boundary order across data pages
// We assume they are ascending/descending until proven wrong
data_page_boundary_ascending: bool,
data_page_boundary_descending: bool,
/// (min, max)
latest_non_null_data_page_min_max: Option<(E::T, E::T)>,
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
}

impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Expand Down Expand Up @@ -279,6 +285,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
column_index_builder: ColumnIndexBuilder::new(),
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
data_page_boundary_ascending: true,
data_page_boundary_descending: true,
latest_non_null_data_page_min_max: None,
}
}

Expand Down Expand Up @@ -467,6 +476,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
let metadata = self.write_column_metadata()?;
self.page_writer.close()?;

let boundary_order = match (
self.data_page_boundary_ascending,
self.data_page_boundary_descending,
) {
// If the lists are composed of equal elements then will be marked as ascending
// (Also the case if all pages are null pages)
(true, true) => BoundaryOrder::ASCENDING,
(true, false) => BoundaryOrder::ASCENDING,
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
(false, true) => BoundaryOrder::DESCENDING,
(false, false) => BoundaryOrder::UNORDERED,
};
self.column_index_builder.set_boundary_order(boundary_order);

let column_index = self
.column_index_builder
.valid()
Expand Down Expand Up @@ -703,6 +725,35 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
(Some(min), Some(max)) => {
update_min(&self.descr, &min, &mut self.column_metrics.min_column_value);
update_max(&self.descr, &max, &mut self.column_metrics.max_column_value);

// Check if min/max are still ascending/descending across pages
// Null pages aren't considered in this sort order
let null_page = (self.page_metrics.num_buffered_rows as u64)
== self.page_metrics.num_page_nulls;
if !null_page {
if let Some((latest_min, latest_max)) = &self.latest_non_null_data_page_min_max
Jefffrey marked this conversation as resolved.
Show resolved Hide resolved
{
if self.data_page_boundary_ascending {
// If latest min/max are greater than new min/max then not ascending anymore
let not_ascending = compare_greater(&self.descr, latest_min, &min)
|| compare_greater(&self.descr, latest_max, &max);
if not_ascending {
self.data_page_boundary_ascending = false;
}
}

if self.data_page_boundary_descending {
// If new min/max are greater than latest min/max then not descending anymore
let not_descending = compare_greater(&self.descr, &min, latest_min)
|| compare_greater(&self.descr, &max, latest_max);
if not_descending {
self.data_page_boundary_descending = false;
}
}
}
self.latest_non_null_data_page_min_max = Some((min.clone(), max.clone()));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried putting the incremental check here instead of inside update_column_offset_index(..) as I couldn't figure out an easy way to get the T: ParquetValueType out of a Statistics enum

One caveat about putting this check here is that it compares the min/maxes before truncation occurs, though I think this should still be ok.

Copy link
Contributor

@tustvold tustvold Nov 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst I think this is correct, perhaps you could just change update_column_offset_index to instead take Option<&ValueStatistics<T>>? This would likely make the existing logic faster, and would make this logic perhaps slightly easier to follow - it is a little surprising that boundary_order is being updated outside of update_column_offset_index


Some(Statistics::new(
Some(min),
Some(max),
Expand Down Expand Up @@ -2569,7 +2620,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down Expand Up @@ -2636,7 +2687,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down
9 changes: 6 additions & 3 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -885,9 +885,8 @@ pub struct ColumnIndexBuilder {
null_pages: Vec<bool>,
min_values: Vec<Vec<u8>>,
max_values: Vec<Vec<u8>>,
// TODO: calc the order for all pages in this column
boundary_order: BoundaryOrder,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
// If one page can't get build index, need to ignore all index in this column
valid: bool,
}
Expand All @@ -904,8 +903,8 @@ impl ColumnIndexBuilder {
null_pages: Vec::new(),
min_values: Vec::new(),
max_values: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
null_counts: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
valid: true,
}
}
Expand All @@ -923,6 +922,10 @@ impl ColumnIndexBuilder {
self.null_counts.push(null_count);
}

pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}

pub fn to_invalid(&mut self) {
self.valid = false;
}
Expand Down
Loading