Skip to content

Commit

Permalink
Enable truncation of binary statistics columns (#5076)
Browse files Browse the repository at this point in the history
* changes needed to introduce min/max exactness

* implement truncation property and logic, tests

* format lints

* change min/max exact to be with... methods

* reduce code noise

* remove redundant clone

---------

Co-authored-by: Matthew Kemp <mkemp@drwholdings.com>
  • Loading branch information
emcake and Matthew Kemp committed Nov 15, 2023
1 parent 7ba36b0 commit 7941577
Show file tree
Hide file tree
Showing 4 changed files with 401 additions and 74 deletions.
228 changes: 211 additions & 17 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,8 +636,16 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Type::BYTE_ARRAY | Type::FIXED_LEN_BYTE_ARRAY => {
self.column_index_builder.append(
null_page,
self.truncate_min_value(stat.min_bytes()),
self.truncate_max_value(stat.max_bytes()),
self.truncate_min_value(
self.props.column_index_truncate_length(),
stat.min_bytes(),
)
.0,
self.truncate_max_value(
self.props.column_index_truncate_length(),
stat.max_bytes(),
)
.0,
self.page_metrics.num_page_nulls as i64,
);
}
Expand All @@ -658,26 +666,26 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.append_row_count(self.page_metrics.num_buffered_rows as i64);
}

fn truncate_min_value(&self, data: &[u8]) -> Vec<u8> {
self.props
.column_index_truncate_length()
fn truncate_min_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
truncation_length
.filter(|l| data.len() > *l)
.and_then(|l| match str::from_utf8(data) {
Ok(str_data) => truncate_utf8(str_data, l),
Err(_) => Some(data[..l].to_vec()),
})
.unwrap_or_else(|| data.to_vec())
.map(|truncated| (truncated, true))
.unwrap_or_else(|| (data.to_vec(), false))
}

fn truncate_max_value(&self, data: &[u8]) -> Vec<u8> {
self.props
.column_index_truncate_length()
fn truncate_max_value(&self, truncation_length: Option<usize>, data: &[u8]) -> (Vec<u8>, bool) {
truncation_length
.filter(|l| data.len() > *l)
.and_then(|l| match str::from_utf8(data) {
Ok(str_data) => truncate_utf8(str_data, l).and_then(increment_utf8),
Err(_) => increment(data[..l].to_vec()),
})
.unwrap_or_else(|| data.to_vec())
.map(|truncated| (truncated, true))
.unwrap_or_else(|| (data.to_vec(), false))
}

/// Adds data page.
Expand Down Expand Up @@ -856,20 +864,64 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
.set_dictionary_page_offset(dict_page_offset);

if self.statistics_enabled != EnabledStatistics::None {
let backwards_compatible_min_max = self.descr.sort_order().is_signed();

let statistics = ValueStatistics::<E::T>::new(
self.column_metrics.min_column_value.clone(),
self.column_metrics.max_column_value.clone(),
self.column_metrics.column_distinct_count,
self.column_metrics.num_column_nulls,
false,
);
)
.with_backwards_compatible_min_max(backwards_compatible_min_max)
.into();

let statistics = match statistics {
Statistics::ByteArray(stats) if stats.has_min_max_set() => {
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes(),
);
Statistics::ByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
Statistics::FixedLenByteArray(stats) if stats.has_min_max_set() => {
let (min, did_truncate_min) = self.truncate_min_value(
self.props.statistics_truncate_length(),
stats.min_bytes(),
);
let (max, did_truncate_max) = self.truncate_max_value(
self.props.statistics_truncate_length(),
stats.max_bytes(),
);
Statistics::FixedLenByteArray(
ValueStatistics::new(
Some(min.into()),
Some(max.into()),
stats.distinct_count(),
stats.null_count(),
backwards_compatible_min_max,
)
.with_max_is_exact(!did_truncate_max)
.with_min_is_exact(!did_truncate_min),
)
}
stats => stats,
};

// Some common readers only support the deprecated statistics
// format so we also write them out if possible
// See https://github.com/apache/arrow-rs/issues/799
let statistics = statistics
.with_backwards_compatible_min_max(self.descr.sort_order().is_signed())
.into();
builder = builder.set_statistics(statistics);
}

Expand Down Expand Up @@ -2612,6 +2664,148 @@ mod tests {
}
}

#[test]
fn test_statistics_truncating_byte_array() {
let page_writer = get_test_page_writer();

const TEST_TRUNCATE_LENGTH: usize = 1;

// Truncate values at 1 byte
let builder =
WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
let props = Arc::new(builder.build());
let mut writer = get_test_column_writer::<ByteArrayType>(page_writer, 0, 0, props);

let mut data = vec![ByteArray::default(); 1];
// This is the expected min value
data[0].set_data(Bytes::from(String::from("Blart Versenwald III")));

writer.write_batch(&data, None, None).unwrap();

writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();

assert_eq!(1, r.rows_written);

let stats = r.metadata.statistics().expect("statistics");
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::ByteArray(_stats) = stats {
let min_value = _stats.min();
let max_value = _stats.max();

assert!(!_stats.min_is_exact());
assert!(!_stats.max_is_exact());

assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);

assert_eq!("B".as_bytes(), min_value.as_bytes());
assert_eq!("C".as_bytes(), max_value.as_bytes());
} else {
panic!("expecting Statistics::ByteArray");
}
}

#[test]
fn test_statistics_truncating_fixed_len_byte_array() {
let page_writer = get_test_page_writer();

const TEST_TRUNCATE_LENGTH: usize = 1;

// Truncate values at 1 byte
let builder =
WriterProperties::builder().set_statistics_truncate_length(Some(TEST_TRUNCATE_LENGTH));
let props = Arc::new(builder.build());
let mut writer = get_test_column_writer::<FixedLenByteArrayType>(page_writer, 0, 0, props);

let mut data = vec![FixedLenByteArray::default(); 1];

const PSEUDO_DECIMAL_VALUE: i128 = 6541894651216648486512564456564654;
const PSEUDO_DECIMAL_BYTES: [u8; 16] = PSEUDO_DECIMAL_VALUE.to_be_bytes();

const EXPECTED_MIN: [u8; TEST_TRUNCATE_LENGTH] = [PSEUDO_DECIMAL_BYTES[0]]; // parquet specifies big-endian order for decimals
const EXPECTED_MAX: [u8; TEST_TRUNCATE_LENGTH] =
[PSEUDO_DECIMAL_BYTES[0].overflowing_add(1).0];

// This is the expected min value
data[0].set_data(Bytes::from(PSEUDO_DECIMAL_BYTES.as_slice()));

writer.write_batch(&data, None, None).unwrap();

writer.flush_data_pages().unwrap();

let r = writer.close().unwrap();

assert_eq!(1, r.rows_written);

let stats = r.metadata.statistics().expect("statistics");
assert!(stats.has_min_max_set());
assert_eq!(stats.null_count(), 0);
assert_eq!(stats.distinct_count(), None);
if let Statistics::FixedLenByteArray(_stats) = stats {
let min_value = _stats.min();
let max_value = _stats.max();

assert!(!_stats.min_is_exact());
assert!(!_stats.max_is_exact());

assert_eq!(min_value.len(), TEST_TRUNCATE_LENGTH);
assert_eq!(max_value.len(), TEST_TRUNCATE_LENGTH);

assert_eq!(EXPECTED_MIN.as_slice(), min_value.as_bytes());
assert_eq!(EXPECTED_MAX.as_slice(), max_value.as_bytes());

let reconstructed_min = i128::from_be_bytes([
min_value.as_bytes()[0],
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]);

let reconstructed_max = i128::from_be_bytes([
max_value.as_bytes()[0],
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
0,
]);

// check that the inner value is correctly bounded by the min/max
println!("min: {reconstructed_min} {PSEUDO_DECIMAL_VALUE}");
assert!(reconstructed_min <= PSEUDO_DECIMAL_VALUE);
println!("max {reconstructed_max} {PSEUDO_DECIMAL_VALUE}");
assert!(reconstructed_max >= PSEUDO_DECIMAL_VALUE);
} else {
panic!("expecting Statistics::FixedLenByteArray");
}
}

#[test]
fn test_send() {
fn test<T: Send>() {}
Expand Down
24 changes: 24 additions & 0 deletions parquet/src/file/properties.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
/// Default value for [`BloomFilterProperties::ndv`]
pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
/// Default values for [`WriterProperties::statistics_truncate_length`]
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = None;

/// Parquet writer version.
///
Expand Down Expand Up @@ -136,6 +138,7 @@ pub struct WriterProperties {
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
}

impl Default for WriterProperties {
Expand Down Expand Up @@ -241,6 +244,13 @@ impl WriterProperties {
self.column_index_truncate_length
}

/// Returns the maximum length of truncated min/max values in statistics.
///
/// `None` if truncation is disabled, must be greater than 0 otherwise.
pub fn statistics_truncate_length(&self) -> Option<usize> {
self.statistics_truncate_length
}

/// Returns encoding for a data page, when dictionary encoding is enabled.
/// This is not configurable.
#[inline]
Expand Down Expand Up @@ -334,6 +344,7 @@ pub struct WriterPropertiesBuilder {
column_properties: HashMap<ColumnPath, ColumnProperties>,
sorting_columns: Option<Vec<SortingColumn>>,
column_index_truncate_length: Option<usize>,
statistics_truncate_length: Option<usize>,
}

impl WriterPropertiesBuilder {
Expand All @@ -352,6 +363,7 @@ impl WriterPropertiesBuilder {
column_properties: HashMap::new(),
sorting_columns: None,
column_index_truncate_length: DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
statistics_truncate_length: DEFAULT_STATISTICS_TRUNCATE_LENGTH,
}
}

Expand All @@ -370,6 +382,7 @@ impl WriterPropertiesBuilder {
column_properties: self.column_properties,
sorting_columns: self.sorting_columns,
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
}
}

Expand Down Expand Up @@ -643,6 +656,17 @@ impl WriterPropertiesBuilder {
self.column_index_truncate_length = max_length;
self
}

/// Sets the max length of min/max value fields in statistics. Must be greater than 0.
/// If set to `None` - there's no effective limit.
pub fn set_statistics_truncate_length(mut self, max_length: Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(value > 0, "Cannot have a 0 statistics truncate length. If you wish to disable min/max value truncation, set it to `None`.");
}

self.statistics_truncate_length = max_length;
self
}
}

/// Controls the level of statistics to be computed by the writer
Expand Down

0 comments on commit 7941577

Please sign in to comment.