Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet: derive boundary order when writing #5110

Merged
merged 5 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
69 changes: 38 additions & 31 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -610,19 +610,21 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}

/// Update the column index and offset index when adding the data page
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) {
fn update_column_offset_index(&mut self, page_statistics: Option<&Statistics>) -> Result<()> {
// update the column index
let null_page =
(self.page_metrics.num_buffered_rows as u64) == self.page_metrics.num_page_nulls;
// a page contains only null values,
// and writers have to set the corresponding entries in min_values and max_values to byte[0]
if null_page && self.column_index_builder.valid() {
self.column_index_builder.append(
null_page,
vec![0; 1],
vec![0; 1],
self.page_metrics.num_page_nulls as i64,
);
self.column_index_builder
.append_with_boundary_check::<E::T>(
&self.descr,
null_page,
vec![0; 1],
vec![0; 1],
self.page_metrics.num_page_nulls as i64,
)?;
} else if self.column_index_builder.valid() {
// from page statistics
// If can't get the page statistics, ignore this column/offset index for this column chunk
Expand All @@ -634,28 +636,32 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// We only truncate if the data is represented as binary
match self.descr.physical_type() {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
self.column_index_builder.append(
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
self.column_index_builder
.append_with_boundary_check::<E::T>(
&self.descr,
null_page,
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
)?;
}
_ => {
self.column_index_builder.append(
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
);
self.column_index_builder
.append_with_boundary_check::<E::T>(
&self.descr,
null_page,
stat.min_bytes().to_vec(),
stat.max_bytes().to_vec(),
self.page_metrics.num_page_nulls as i64,
)?;
}
}
}
Expand All @@ -664,6 +670,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);
Ok(())
}

fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
Expand Down Expand Up @@ -715,7 +722,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};

// update column and offset index
self.update_column_offset_index(page_statistics.as_ref());
self.update_column_offset_index(page_statistics.as_ref())?;

let compressed_page = match self.props.writer_version() {
WriterVersion::PARQUET_1_0 => {
Expand Down Expand Up @@ -1065,7 +1072,7 @@ fn update_stat<T: ParquetValueType, F>(
}

/// Evaluate `a > b` according to underlying logical type.
fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
pub(crate) fn compare_greater<T: ParquetValueType>(descr: &ColumnDescriptor, a: &T, b: &T) -> bool {
if let Some(LogicalType::Integer { is_signed, .. }) = descr.logical_type() {
if !is_signed {
// need to compare unsigned
Expand Down Expand Up @@ -2569,7 +2576,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down Expand Up @@ -2636,7 +2643,7 @@ mod tests {
// column index
assert_eq!(1, column_index.null_pages.len());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
assert!(!column_index.null_pages[0]);
assert_eq!(0, column_index.null_counts.as_ref().unwrap()[0]);

Expand Down