From 27b75368e82da5b130e5af065c90459f76e433c4 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Mon, 1 Aug 2022 13:25:36 +0100 Subject: [PATCH] Remove fallibility from RLEEncoder (#2226) --- parquet/src/arrow/arrow_writer/byte_array.rs | 14 ++-- .../arrow/record_reader/definition_levels.rs | 8 +-- parquet/src/column/writer/mod.rs | 16 ++--- .../src/encodings/encoding/dict_encoder.rs | 8 +-- parquet/src/encodings/encoding/mod.rs | 6 +- parquet/src/encodings/levels.rs | 35 ++++----- parquet/src/encodings/rle.rs | 71 ++++++++----------- parquet/src/util/test_common/page_util.rs | 4 +- 8 files changed, 70 insertions(+), 92 deletions(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 52698a31bcdc..d1a0da5b391d 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -374,7 +374,7 @@ impl DictEncoder { &mut self, min_value: Option, max_value: Option, - ) -> Result> { + ) -> DataPageValues { let num_values = self.indices.len(); let buffer_len = self.estimated_data_page_size(); let mut buffer = Vec::with_capacity(buffer_len); @@ -382,20 +382,18 @@ impl DictEncoder { let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer); for index in &self.indices { - if !encoder.put(*index as u64)? { - return Err(general_err!("Encoder doesn't have enough space")); - } + encoder.put(*index as u64) } self.indices.clear(); - Ok(DataPageValues { - buf: encoder.consume()?.into(), + DataPageValues { + buf: encoder.consume().into(), num_values, encoding: Encoding::RLE_DICTIONARY, min_value, max_value, - }) + } } } @@ -500,7 +498,7 @@ impl ColumnValueEncoder for ByteArrayEncoder { let max_value = self.max_value.take(); match &mut self.dict_encoder { - Some(encoder) => encoder.flush_data_page(min_value, max_value), + Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)), _ => self.fallback.flush_data_page(min_value, max_value), } } diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index 53eeab9a514c..2d65db77fa69 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -408,12 +408,12 @@ mod tests { let mut encoder = RleEncoder::new(1, 1024); for _ in 0..len { let bool = rng.gen_bool(0.8); - assert!(encoder.put(bool as u64).unwrap()); + encoder.put(bool as u64); expected.append(bool); } assert_eq!(expected.len(), len); - let encoded = encoder.consume().unwrap(); + let encoded = encoder.consume(); let mut decoder = PackedDecoder::new(); decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); @@ -444,7 +444,7 @@ mod tests { let mut total_value = 0; for _ in 0..len { let bool = rng.gen_bool(0.8); - assert!(encoder.put(bool as u64).unwrap()); + encoder.put(bool as u64); expected.append(bool); if bool { total_value += 1; @@ -452,7 +452,7 @@ mod tests { } assert_eq!(expected.len(), len); - let encoded = encoder.consume().unwrap(); + let encoded = encoder.consume(); let mut decoder = PackedDecoder::new(); decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded)); diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 6c467b5e4c0b..ce773c19d52b 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -630,7 +630,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Encoding::RLE, &self.rep_levels_sink[..], max_rep_level, - )?[..], + )[..], ); } @@ -640,7 +640,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { Encoding::RLE, &self.def_levels_sink[..], max_def_level, - )?[..], + )[..], ); } @@ -671,14 +671,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { if max_rep_level > 0 { let levels = - self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level)?; + self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level); rep_levels_byte_len = levels.len(); buffer.extend_from_slice(&levels[..]); } if max_def_level > 0 { let levels = - self.encode_levels_v2(&self.def_levels_sink[..], max_def_level)?; + self.encode_levels_v2(&self.def_levels_sink[..], max_def_level); def_levels_byte_len = levels.len(); buffer.extend_from_slice(&levels[..]); } @@ -794,18 +794,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { encoding: Encoding, levels: &[i16], max_level: i16, - ) -> Result> { + ) -> Vec { let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len()); - encoder.put(levels)?; + encoder.put(levels); encoder.consume() } /// Encodes definition or repetition levels for Data Page v2. /// Encoding is always RLE. #[inline] - fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result> { + fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec { let mut encoder = LevelEncoder::v2(max_level, levels.len()); - encoder.put(levels)?; + encoder.put(levels); encoder.consume() } diff --git a/parquet/src/encodings/encoding/dict_encoder.rs b/parquet/src/encodings/encoding/dict_encoder.rs index 8f3c98acafeb..a7855cc84606 100644 --- a/parquet/src/encodings/encoding/dict_encoder.rs +++ b/parquet/src/encodings/encoding/dict_encoder.rs @@ -23,7 +23,7 @@ use crate::data_type::private::ParquetValueType; use crate::data_type::DataType; use crate::encodings::encoding::{Encoder, PlainEncoder}; use crate::encodings::rle::RleEncoder; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; @@ -132,12 +132,10 @@ impl DictEncoder { // Write bit width in the first byte let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer); for index in &self.indices { - if !encoder.put(*index as u64)? { - return Err(general_err!("Encoder doesn't have enough space")); - } + encoder.put(*index as u64) } self.indices.clear(); - Ok(ByteBufferPtr::new(encoder.consume()?)) + Ok(ByteBufferPtr::new(encoder.consume())) } fn put_one(&mut self, value: &T::T) { diff --git a/parquet/src/encodings/encoding/mod.rs b/parquet/src/encodings/encoding/mod.rs index 383211f128ce..b0c8fa10faa7 100644 --- a/parquet/src/encodings/encoding/mod.rs +++ b/parquet/src/encodings/encoding/mod.rs @@ -195,9 +195,7 @@ impl Encoder for RleValueEncoder { for value in values { let value = value.as_u64()?; - if !rle_encoder.put(value)? { - return Err(general_err!("RLE buffer is full")); - } + rle_encoder.put(value) } Ok(()) } @@ -227,7 +225,7 @@ impl Encoder for RleValueEncoder { .expect("RLE value encoder is not initialized"); // Flush all encoder buffers and raw values - let mut buf = rle_encoder.consume()?; + let mut buf = rle_encoder.consume(); assert!(buf.len() > 4, "should have had padding inserted"); // Note that buf does not have any offset, all data is encoded bytes diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index 8bdfdd3e9dc3..62c68d843c71 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -21,7 +21,7 @@ use super::rle::{RleDecoder, RleEncoder}; use crate::basic::Encoding; use crate::data_type::AsBytes; -use crate::errors::{ParquetError, Result}; +use crate::errors::Result; use crate::util::{ bit_util::{ceil, num_required_bits, BitReader, BitWriter}, memory::ByteBufferPtr, @@ -97,21 +97,16 @@ impl LevelEncoder { /// Put/encode levels vector into this level encoder. /// Returns number of encoded values that are less than or equal to length of the /// input buffer. - /// - /// RLE and BIT_PACKED level encoders return Err() when internal buffer overflows or - /// flush fails. #[inline] - pub fn put(&mut self, buffer: &[i16]) -> Result { + pub fn put(&mut self, buffer: &[i16]) -> usize { let mut num_encoded = 0; match *self { LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => { for value in buffer { - if !encoder.put(*value as u64)? { - return Err(general_err!("RLE buffer is full")); - } + encoder.put(*value as u64); num_encoded += 1; } - encoder.flush()?; + encoder.flush(); } LevelEncoder::BitPacked(bit_width, ref mut encoder) => { for value in buffer { @@ -121,25 +116,25 @@ impl LevelEncoder { encoder.flush(); } } - Ok(num_encoded) + num_encoded } /// Finalizes level encoder, flush all intermediate buffers and return resulting /// encoded buffer. Returned buffer is already truncated to encoded bytes only. #[inline] - pub fn consume(self) -> Result> { + pub fn consume(self) -> Vec { match self { LevelEncoder::Rle(encoder) => { - let mut encoded_data = encoder.consume()?; + let mut encoded_data = encoder.consume(); // Account for the buffer offset let encoded_len = encoded_data.len() - mem::size_of::(); let len = (encoded_len as i32).to_le(); let len_bytes = len.as_bytes(); encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes); - Ok(encoded_data) + encoded_data } LevelEncoder::RleV2(encoder) => encoder.consume(), - LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()), + LevelEncoder::BitPacked(_, encoder) => encoder.consume(), } } } @@ -287,8 +282,8 @@ mod tests { } else { LevelEncoder::v1(enc, max_level, levels.len()) }; - encoder.put(levels).expect("put() should be OK"); - let encoded_levels = encoder.consume().expect("consume() should be OK"); + encoder.put(levels); + let encoded_levels = encoder.consume(); let byte_buf = ByteBufferPtr::new(encoded_levels); let mut decoder; @@ -318,8 +313,8 @@ mod tests { } else { LevelEncoder::v1(enc, max_level, levels.len()) }; - encoder.put(levels).expect("put() should be OK"); - let encoded_levels = encoder.consume().expect("consume() should be OK"); + encoder.put(levels); + let encoded_levels = encoder.consume(); let byte_buf = ByteBufferPtr::new(encoded_levels); let mut decoder; @@ -366,8 +361,8 @@ mod tests { LevelEncoder::v1(enc, max_level, levels.len()) }; // Encode only one value - let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK"); - let encoded_levels = encoder.consume().expect("consume() should be OK"); + let num_encoded = encoder.put(&levels[0..1]); + let encoded_levels = encoder.consume(); assert_eq!(num_encoded, 1); let byte_buf = ByteBufferPtr::new(encoded_levels); diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index 28ebd7d3a171..aad833e0eee3 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -132,23 +132,21 @@ impl RleEncoder { } /// Encodes `value`, which must be representable with `bit_width` bits. - /// Returns true if the value fits in buffer, false if it doesn't, or - /// error if something is wrong. #[inline] - pub fn put(&mut self, value: u64) -> Result { + pub fn put(&mut self, value: u64) { // This function buffers 8 values at a time. After seeing 8 values, it // decides whether the current run should be encoded in bit-packed or RLE. if self.current_value == value { self.repeat_count += 1; if self.repeat_count > 8 { // A continuation of last value. No need to buffer. - return Ok(true); + return; } } else { if self.repeat_count >= 8 { // The current RLE run has ended and we've gathered enough. Flush first. assert_eq!(self.bit_packed_count, 0); - self.flush_rle_run()?; + self.flush_rle_run(); } self.repeat_count = 1; self.current_value = value; @@ -159,10 +157,8 @@ impl RleEncoder { if self.num_buffered_values == 8 { // Buffered values are full. Flush them. assert_eq!(self.bit_packed_count % 8, 0); - self.flush_buffered_values()?; + self.flush_buffered_values(); } - - Ok(true) } #[inline] @@ -180,17 +176,17 @@ impl RleEncoder { } #[inline] - pub fn consume(mut self) -> Result> { - self.flush()?; - Ok(self.bit_writer.consume()) + pub fn consume(mut self) -> Vec { + self.flush(); + self.bit_writer.consume() } /// Borrow equivalent of the `consume` method. /// Call `clear()` after invoking this method. #[inline] - pub fn flush_buffer(&mut self) -> Result<&[u8]> { - self.flush()?; - Ok(self.bit_writer.flush_buffer()) + pub fn flush_buffer(&mut self) -> &[u8] { + self.flush(); + self.bit_writer.flush_buffer() } /// Clears the internal state so this encoder can be reused (e.g., after becoming @@ -208,7 +204,7 @@ impl RleEncoder { /// Flushes all remaining values and return the final byte buffer maintained by the /// internal writer. #[inline] - pub fn flush(&mut self) -> Result<()> { + pub fn flush(&mut self) { if self.bit_packed_count > 0 || self.repeat_count > 0 || self.num_buffered_values > 0 @@ -217,7 +213,7 @@ impl RleEncoder { && (self.repeat_count == self.num_buffered_values || self.num_buffered_values == 0); if self.repeat_count > 0 && all_repeat { - self.flush_rle_run()?; + self.flush_rle_run(); } else { // Buffer the last group of bit-packed values to 8 by padding with 0s. if self.num_buffered_values > 0 { @@ -227,14 +223,13 @@ impl RleEncoder { } } self.bit_packed_count += self.num_buffered_values; - self.flush_bit_packed_run(true)?; + self.flush_bit_packed_run(true); self.repeat_count = 0; } } - Ok(()) } - fn flush_rle_run(&mut self) -> Result<()> { + fn flush_rle_run(&mut self) { assert!(self.repeat_count > 0); let indicator_value = self.repeat_count << 1; self.bit_writer.put_vlq_int(indicator_value as u64); @@ -244,10 +239,9 @@ impl RleEncoder { ); self.num_buffered_values = 0; self.repeat_count = 0; - Ok(()) } - fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) -> Result<()> { + fn flush_bit_packed_run(&mut self, update_indicator_byte: bool) { if self.indicator_byte_pos < 0 { self.indicator_byte_pos = self.bit_writer.skip(1) as i64; } @@ -270,20 +264,19 @@ impl RleEncoder { self.indicator_byte_pos = -1; self.bit_packed_count = 0; } - Ok(()) } #[inline(never)] - fn flush_buffered_values(&mut self) -> Result<()> { + fn flush_buffered_values(&mut self) { if self.repeat_count >= 8 { self.num_buffered_values = 0; if self.bit_packed_count > 0 { // In this case we choose RLE encoding. Flush the current buffered values // as bit-packed encoding. assert_eq!(self.bit_packed_count % 8, 0); - self.flush_bit_packed_run(true)? + self.flush_bit_packed_run(true) } - return Ok(()); + return; } self.bit_packed_count += self.num_buffered_values; @@ -292,12 +285,11 @@ impl RleEncoder { // We've reached the maximum value that can be hold in a single bit-packed // run. assert!(self.indicator_byte_pos >= 0); - self.flush_bit_packed_run(true)?; + self.flush_bit_packed_run(true); } else { - self.flush_bit_packed_run(false)?; + self.flush_bit_packed_run(false); } self.repeat_count = 0; - Ok(()) } } @@ -585,11 +577,11 @@ mod tests { let mut encoder1 = RleEncoder::new(3, 256); let mut encoder2 = RleEncoder::new(3, 256); for value in data { - encoder1.put(value as u64).unwrap(); - encoder2.put(value as u64).unwrap(); + encoder1.put(value as u64); + encoder2.put(value as u64); } - let res1 = encoder1.flush_buffer().unwrap(); - let res2 = encoder2.consume().unwrap(); + let res1 = encoder1.flush_buffer(); + let res2 = encoder2.consume(); assert_eq!(res1, &res2[..]); } @@ -763,10 +755,9 @@ mod tests { let buffer_len = 64 * 1024; let mut encoder = RleEncoder::new(bit_width, buffer_len); for v in values { - let result = encoder.put(*v as u64); - assert!(result.is_ok()); + encoder.put(*v as u64) } - let buffer = ByteBufferPtr::new(encoder.consume().expect("Expect consume() OK")); + let buffer = ByteBufferPtr::new(encoder.consume()); if expected_len != -1 { assert_eq!(buffer.len(), expected_len as usize); } @@ -919,9 +910,9 @@ mod tests { let values: Vec = vec![0, 1, 1, 1, 1, 0, 0, 0, 0, 1]; let mut encoder = RleEncoder::new(bit_width, buffer_len); for v in &values { - assert!(encoder.put(*v as u64).expect("put() should be OK")); + encoder.put(*v as u64) } - let buffer = encoder.consume().expect("consume() should be OK"); + let buffer = encoder.consume(); let mut decoder = RleDecoder::new(bit_width); decoder.set_data(ByteBufferPtr::new(buffer)); let mut actual_values: Vec = vec![0; values.len()]; @@ -935,12 +926,10 @@ mod tests { let buffer_len = 64 * 1024; let mut encoder = RleEncoder::new(bit_width, buffer_len); for v in values { - let result = encoder.put(*v as u64).expect("put() should be OK"); - assert!(result, "put() should not return false"); + encoder.put(*v as u64) } - let buffer = - ByteBufferPtr::new(encoder.consume().expect("consume() should be OK")); + let buffer = ByteBufferPtr::new(encoder.consume()); // Verify read let mut decoder = RleDecoder::new(bit_width); diff --git a/parquet/src/util/test_common/page_util.rs b/parquet/src/util/test_common/page_util.rs index d7653d4e585f..bc197d00e00d 100644 --- a/parquet/src/util/test_common/page_util.rs +++ b/parquet/src/util/test_common/page_util.rs @@ -75,8 +75,8 @@ impl DataPageBuilderImpl { return 0; } let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len()); - level_encoder.put(levels).expect("put() should be OK"); - let encoded_levels = level_encoder.consume().expect("consume() should be OK"); + level_encoder.put(levels); + let encoded_levels = level_encoder.consume(); // Actual encoded bytes (without length offset) let encoded_bytes = &encoded_levels[mem::size_of::()..]; if self.datapage_v2 {