Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove fallibility from paruqet RleEncoder (#2226) #2259

Merged
merged 1 commit into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 6 additions & 8 deletions parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,28 +374,26 @@ impl DictEncoder {
&mut self,
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
) -> Result<DataPageValues<ByteArray>> {
) -> DataPageValues<ByteArray> {
let num_values = self.indices.len();
let buffer_len = self.estimated_data_page_size();
let mut buffer = Vec::with_capacity(buffer_len);
buffer.push(self.bit_width() as u8);

let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
}
encoder.put(*index as u64)
}

self.indices.clear();

Ok(DataPageValues {
buf: encoder.consume()?.into(),
DataPageValues {
buf: encoder.consume().into(),
num_values,
encoding: Encoding::RLE_DICTIONARY,
min_value,
max_value,
})
}
}
}

Expand Down Expand Up @@ -500,7 +498,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
let max_value = self.max_value.take();

match &mut self.dict_encoder {
Some(encoder) => encoder.flush_data_page(min_value, max_value),
Some(encoder) => Ok(encoder.flush_data_page(min_value, max_value)),
_ => self.fallback.flush_data_page(min_value, max_value),
}
}
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/arrow/record_reader/definition_levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,12 @@ mod tests {
let mut encoder = RleEncoder::new(1, 1024);
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
encoder.put(bool as u64);
expected.append(bool);
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

Expand Down Expand Up @@ -444,15 +444,15 @@ mod tests {
let mut total_value = 0;
for _ in 0..len {
let bool = rng.gen_bool(0.8);
assert!(encoder.put(bool as u64).unwrap());
encoder.put(bool as u64);
expected.append(bool);
if bool {
total_value += 1;
}
}
assert_eq!(expected.len(), len);

let encoded = encoder.consume().unwrap();
let encoded = encoder.consume();
let mut decoder = PackedDecoder::new();
decoder.set_data(Encoding::RLE, ByteBufferPtr::new(encoded));

Expand Down
16 changes: 8 additions & 8 deletions parquet/src/column/writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Encoding::RLE,
&self.rep_levels_sink[..],
max_rep_level,
)?[..],
)[..],
);
}

Expand All @@ -640,7 +640,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
Encoding::RLE,
&self.def_levels_sink[..],
max_def_level,
)?[..],
)[..],
);
}

Expand Down Expand Up @@ -671,14 +671,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {

if max_rep_level > 0 {
let levels =
self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level)?;
self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
rep_levels_byte_len = levels.len();
buffer.extend_from_slice(&levels[..]);
}

if max_def_level > 0 {
let levels =
self.encode_levels_v2(&self.def_levels_sink[..], max_def_level)?;
self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
def_levels_byte_len = levels.len();
buffer.extend_from_slice(&levels[..]);
}
Expand Down Expand Up @@ -794,18 +794,18 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
encoding: Encoding,
levels: &[i16],
max_level: i16,
) -> Result<Vec<u8>> {
) -> Vec<u8> {
let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
encoder.put(levels)?;
encoder.put(levels);
encoder.consume()
}

/// Encodes definition or repetition levels for Data Page v2.
/// Encoding is always RLE.
#[inline]
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Result<Vec<u8>> {
fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
let mut encoder = LevelEncoder::v2(max_level, levels.len());
encoder.put(levels)?;
encoder.put(levels);
encoder.consume()
}

Expand Down
8 changes: 3 additions & 5 deletions parquet/src/encodings/encoding/dict_encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::DataType;
use crate::encodings::encoding::{Encoder, PlainEncoder};
use crate::encodings::rle::RleEncoder;
use crate::errors::{ParquetError, Result};
use crate::errors::Result;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
Expand Down Expand Up @@ -132,12 +132,10 @@ impl<T: DataType> DictEncoder<T> {
// Write bit width in the first byte
let mut encoder = RleEncoder::new_from_buf(self.bit_width(), buffer);
for index in &self.indices {
if !encoder.put(*index as u64)? {
return Err(general_err!("Encoder doesn't have enough space"));
}
encoder.put(*index as u64)
}
self.indices.clear();
Ok(ByteBufferPtr::new(encoder.consume()?))
Ok(ByteBufferPtr::new(encoder.consume()))
}

fn put_one(&mut self, value: &T::T) {
Expand Down
6 changes: 2 additions & 4 deletions parquet/src/encodings/encoding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {

for value in values {
let value = value.as_u64()?;
if !rle_encoder.put(value)? {
return Err(general_err!("RLE buffer is full"));
}
rle_encoder.put(value)
}
Ok(())
}
Expand Down Expand Up @@ -227,7 +225,7 @@ impl<T: DataType> Encoder<T> for RleValueEncoder<T> {
.expect("RLE value encoder is not initialized");

// Flush all encoder buffers and raw values
let mut buf = rle_encoder.consume()?;
let mut buf = rle_encoder.consume();
assert!(buf.len() > 4, "should have had padding inserted");

// Note that buf does not have any offset, all data is encoded bytes
Expand Down
35 changes: 15 additions & 20 deletions parquet/src/encodings/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use super::rle::{RleDecoder, RleEncoder};

use crate::basic::Encoding;
use crate::data_type::AsBytes;
use crate::errors::{ParquetError, Result};
use crate::errors::Result;
use crate::util::{
bit_util::{ceil, num_required_bits, BitReader, BitWriter},
memory::ByteBufferPtr,
Expand Down Expand Up @@ -97,21 +97,16 @@ impl LevelEncoder {
/// Put/encode levels vector into this level encoder.
/// Returns number of encoded values that are less than or equal to length of the
/// input buffer.
///
/// RLE and BIT_PACKED level encoders return Err() when internal buffer overflows or
/// flush fails.
#[inline]
pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
pub fn put(&mut self, buffer: &[i16]) -> usize {
let mut num_encoded = 0;
match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => {
for value in buffer {
if !encoder.put(*value as u64)? {
return Err(general_err!("RLE buffer is full"));
}
encoder.put(*value as u64);
num_encoded += 1;
}
encoder.flush()?;
encoder.flush();
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
Expand All @@ -121,25 +116,25 @@ impl LevelEncoder {
encoder.flush();
}
}
Ok(num_encoded)
num_encoded
}

/// Finalizes level encoder, flush all intermediate buffers and return resulting
/// encoded buffer. Returned buffer is already truncated to encoded bytes only.
#[inline]
pub fn consume(self) -> Result<Vec<u8>> {
pub fn consume(self) -> Vec<u8> {
match self {
LevelEncoder::Rle(encoder) => {
let mut encoded_data = encoder.consume()?;
let mut encoded_data = encoder.consume();
// Account for the buffer offset
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
let len = (encoded_len as i32).to_le();
let len_bytes = len.as_bytes();
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
Ok(encoded_data)
encoded_data
}
LevelEncoder::RleV2(encoder) => encoder.consume(),
LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()),
LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
}
}
}
Expand Down Expand Up @@ -287,8 +282,8 @@ mod tests {
} else {
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
encoder.put(levels);
let encoded_levels = encoder.consume();

let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
Expand Down Expand Up @@ -318,8 +313,8 @@ mod tests {
} else {
LevelEncoder::v1(enc, max_level, levels.len())
};
encoder.put(levels).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
encoder.put(levels);
let encoded_levels = encoder.consume();

let byte_buf = ByteBufferPtr::new(encoded_levels);
let mut decoder;
Expand Down Expand Up @@ -366,8 +361,8 @@ mod tests {
LevelEncoder::v1(enc, max_level, levels.len())
};
// Encode only one value
let num_encoded = encoder.put(&levels[0..1]).expect("put() should be OK");
let encoded_levels = encoder.consume().expect("consume() should be OK");
let num_encoded = encoder.put(&levels[0..1]);
let encoded_levels = encoder.consume();
assert_eq!(num_encoded, 1);

let byte_buf = ByteBufferPtr::new(encoded_levels);
Expand Down