Skip to content

Commit

Permalink
Use BufWriter when writing bloom filters (#3318) (#3319)
Browse files Browse the repository at this point in the history
Disable bloom filters for most tests
  • Loading branch information
tustvold committed Dec 9, 2022
1 parent f078aed commit c215f49
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 17 deletions.
55 changes: 47 additions & 8 deletions parquet/src/arrow/arrow_writer/mod.rs
Expand Up @@ -1225,16 +1225,44 @@ mod tests {
file
}

struct RoundTripOptions {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
}

impl RoundTripOptions {
fn new(values: ArrayRef, nullable: bool) -> Self {
let data_type = values.data_type().clone();
let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
Self {
values,
schema: Arc::new(schema),
bloom_filter: false,
}
}
}

fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
let data_type = values.data_type().clone();
let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
one_column_roundtrip_with_schema(values, Arc::new(schema))
one_column_roundtrip_with_options(RoundTripOptions::new(values, nullable))
}

fn one_column_roundtrip_with_schema(
values: ArrayRef,
schema: SchemaRef,
) -> Vec<File> {
let mut options = RoundTripOptions::new(values, false);
options.schema = schema;
one_column_roundtrip_with_options(options)
}

fn one_column_roundtrip_with_options(options: RoundTripOptions) -> Vec<File> {
let RoundTripOptions {
values,
schema,
bloom_filter,
} = options;

let encodings = match values.data_type() {
DataType::Utf8
| DataType::LargeUtf8
Expand Down Expand Up @@ -1270,7 +1298,7 @@ mod tests {
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_pagesize_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(true)
.set_bloom_filter_enabled(bloom_filter)
.build();

files.push(roundtrip_opts(&expected_batch, props))
Expand Down Expand Up @@ -1596,8 +1624,11 @@ mod tests {

#[test]
fn i32_column_bloom_filter() {
let positive_values: Vec<i32> = (0..SMALL_SIZE as i32).collect();
let files = values_required::<Int32Array, _>(positive_values);
let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
Expand All @@ -1612,7 +1643,11 @@ mod tests {
let many_vecs: Vec<_> = std::iter::repeat(one_vec).take(SMALL_SIZE).collect();
let many_vecs_iter = many_vecs.iter().map(|v| v.as_slice());

let files = values_required::<BinaryArray, _>(many_vecs_iter);
let array = Arc::new(BinaryArray::from_iter_values(many_vecs_iter));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;

let files = one_column_roundtrip_with_options(options);
check_bloom_filter(
files,
"col".to_string(),
Expand All @@ -1626,7 +1661,11 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE).map(|i| i.to_string()).collect();
let raw_strs = raw_values.iter().map(|s| s.as_str());

let files = values_optional::<StringArray, _>(raw_strs);
let array = Arc::new(StringArray::from_iter_values(raw_strs));
let mut options = RoundTripOptions::new(array, false);
options.bloom_filter = true;

let files = one_column_roundtrip_with_options(options);

let optional_raw_values: Vec<_> = raw_values
.iter()
Expand Down
7 changes: 5 additions & 2 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -28,7 +28,7 @@ use crate::format::{
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::Write;
use std::io::{BufWriter, Write};
use std::sync::Arc;
use thrift::protocol::{
TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol, TSerializable,
Expand Down Expand Up @@ -177,14 +177,17 @@ impl Sbbf {
}

/// Write the bloom filter data (header and then bitset) to the output
pub(crate) fn write<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
pub(crate) fn write<W: Write>(&self, writer: W) -> Result<(), ParquetError> {
// Use a BufWriter to avoid costs of writing individual blocks
let mut writer = BufWriter::new(writer);
let mut protocol = TCompactOutputProtocol::new(&mut writer);
let header = self.header();
header.write_to_out_protocol(&mut protocol).map_err(|e| {
ParquetError::General(format!("Could not write bloom filter header: {}", e))
})?;
protocol.flush()?;
self.write_bitset(&mut writer)?;
writer.flush()?;
Ok(())
}

Expand Down
14 changes: 7 additions & 7 deletions parquet/src/file/properties.rs
Expand Up @@ -630,7 +630,7 @@ struct ColumnProperties {
statistics_enabled: Option<EnabledStatistics>,
max_statistics_size: Option<usize>,
/// bloom filter related properties
bloom_filter_properies: Option<BloomFilterProperties>,
bloom_filter_properties: Option<BloomFilterProperties>,
}

impl ColumnProperties {
Expand Down Expand Up @@ -674,10 +674,10 @@ impl ColumnProperties {
/// otherwise it is a no-op.
/// If `value` is `false`, resets bloom filter properties to `None`.
fn set_bloom_filter_enabled(&mut self, value: bool) {
if value && self.bloom_filter_properies.is_none() {
self.bloom_filter_properies = Some(Default::default())
if value && self.bloom_filter_properties.is_none() {
self.bloom_filter_properties = Some(Default::default())
} else if !value {
self.bloom_filter_properies = None
self.bloom_filter_properties = None
}
}

Expand All @@ -694,15 +694,15 @@ impl ColumnProperties {
value
);

self.bloom_filter_properies
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.fpp = value;
}

/// Sets the number of distinct (unique) values for bloom filter for this column, and implicitly
/// enables bloom filter if not previously enabled.
fn set_bloom_filter_ndv(&mut self, value: u64) {
self.bloom_filter_properies
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.ndv = value;
}
Expand Down Expand Up @@ -737,7 +737,7 @@ impl ColumnProperties {

/// Returns the bloom filter properties, or `None` if not enabled
fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> {
self.bloom_filter_properies.as_ref()
self.bloom_filter_properties.as_ref()
}
}

Expand Down

0 comments on commit c215f49

Please sign in to comment.