Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix some clippy lints in parquet crate, rename LevelEncoder variants to conform to Rust standards #1273

Merged
merged 16 commits into from
Feb 8, 2022
Merged
9 changes: 7 additions & 2 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,13 @@ where
let mut records_read = 0;
let mut end_of_last_record = self.num_values;

for current in self.num_values..self.values_written {
if buf[current] == 0 && current != self.num_values {
for (current, item) in buf
.iter()
.enumerate()
.take(self.values_written)
.skip(self.num_values)
{
if *item == 0 && current != self.num_values {
records_read += 1;
end_of_last_record = current;

Expand Down
7 changes: 4 additions & 3 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1955,9 +1955,10 @@ mod tests {
";
let parquet_group_type = parse_message_type(message_type).unwrap();

let mut key_value_metadata: Vec<KeyValue> = Vec::new();
key_value_metadata.push(KeyValue::new("foo".to_owned(), Some("bar".to_owned())));
key_value_metadata.push(KeyValue::new("baz".to_owned(), None));
let key_value_metadata = vec![
KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
KeyValue::new("baz".to_owned(), None)
];

let mut expected_metadata: HashMap<String, String> = HashMap::new();
expected_metadata.insert("foo".to_owned(), "bar".to_owned());
Expand Down
16 changes: 5 additions & 11 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -847,17 +847,12 @@ pub(crate) mod private {
decoder.start += bytes_to_decode;

let mut pos = 0; // position in byte array
for i in 0..num_values {
for item in buffer.iter_mut().take(num_values) {
let elem0 = byteorder::LittleEndian::read_u32(&bytes[pos..pos + 4]);
let elem1 = byteorder::LittleEndian::read_u32(&bytes[pos + 4..pos + 8]);
let elem2 = byteorder::LittleEndian::read_u32(&bytes[pos + 8..pos + 12]);

buffer[i]
.as_mut_any()
.downcast_mut::<Self>()
.unwrap()
.set_data(elem0, elem1, elem2);

item.set_data(elem0, elem1, elem2);
pos += 12;
}
decoder.num_values -= num_values;
Expand Down Expand Up @@ -1001,16 +996,15 @@ pub(crate) mod private {
.as_mut()
.expect("set_data should have been called");
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
for i in 0..num_values {

for item in buffer.iter_mut().take(num_values) {
let len = decoder.type_length as usize;

if data.len() < decoder.start + len {
return Err(eof_err!("Not enough bytes to decode"));
}

let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap();

val.set_data(data.range(decoder.start, len));
item.set_data(data.range(decoder.start, len));
decoder.start += len;
}
decoder.num_values -= num_values;
Expand Down
16 changes: 8 additions & 8 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,11 +636,11 @@ impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {

let data = self.data.as_ref().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
for i in 0..num_values {

for item in buffer.iter_mut().take(num_values) {
let len = self.lengths[self.current_idx] as usize;

buffer[i]
.as_mut_any()
item.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data.range(self.offset, len));
Expand Down Expand Up @@ -743,7 +743,7 @@ impl<'m, T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => {
let num_values = cmp::min(buffer.len(), self.num_values);
let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
for i in 0..num_values {
for item in buffer.iter_mut().take(num_values) {
// Process suffix
// TODO: this is awkward - maybe we should add a non-vectorized API?
let suffix_decoder = self.suffix_decoder.as_mut().expect("decoder not initialized");
Expand All @@ -761,12 +761,12 @@ impl<'m, T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
let data = ByteBufferPtr::new(result.clone());

match ty {
Type::BYTE_ARRAY => buffer[i]
Type::BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data),
Type::FIXED_LEN_BYTE_ARRAY => buffer[i]
Type::FIXED_LEN_BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.unwrap()
Expand Down Expand Up @@ -1340,11 +1340,11 @@ mod tests {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[bool]) -> Vec<u8> {
let mut v = vec![];
for i in 0..data.len() {
for (i, item) in data.iter().enumerate() {
if i % 8 == 0 {
v.push(0);
}
if data[i] {
if *item {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this type of change is not only more "idiomatic" I think it will also be faster (as bounds checks aren't being done on each access of data 👍 )

set_array_bit(&mut v[..], i);
}
}
Expand Down
16 changes: 9 additions & 7 deletions parquet/src/encodings/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ pub trait Encoder<T: DataType> {
let num_values = values.len();
let mut buffer = Vec::with_capacity(num_values);
// TODO: this is pretty inefficient. Revisit in future.
for i in 0..num_values {
for (i, item) in values.iter().enumerate().take(num_values) {
if bit_util::get_bit(valid_bits, i) {
buffer.push(values[i].clone());
buffer.push(item.clone());
}
}
self.put(&buffer[..])?;
Expand Down Expand Up @@ -1128,11 +1128,13 @@ mod tests {
let mut decoder =
create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);

let mut input = vec![];
input.push(ByteArray::from("aa"));
input.push(ByteArray::from("aaa"));
input.push(ByteArray::from("aa"));
input.push(ByteArray::from("aaa"));
let input = vec![
ByteArray::from("aa"),
ByteArray::from("aaa"),
ByteArray::from("aa"),
ByteArray::from("aaa"),
];

let mut output = vec![ByteArray::default(); input.len()];

let mut result =
Expand Down
55 changes: 27 additions & 28 deletions parquet/src/encodings/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ pub fn max_buffer_size(
}

/// Encoder for definition/repetition levels.
/// Currently only supports RLE and BIT_PACKED (dev/null) encoding, including v2.
/// Currently only supports Rle and BitPacked (dev/null) encoding, including v2.
pub enum LevelEncoder {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is technically a breaking API change, but I think it is ok

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The encodings module is experimental and so this isn't actually a breaking change. Clippy ignores proposing breaking changes to the API (at least for this lint because the salt was strong).

RLE(RleEncoder),
RLE_V2(RleEncoder),
BIT_PACKED(u8, BitWriter),
Rle(RleEncoder),
RleV2(RleEncoder),
BitPacked(u8, BitWriter),
}

impl LevelEncoder {
Expand All @@ -68,7 +68,7 @@ impl LevelEncoder {
pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelEncoder::RLE(RleEncoder::new_from_buf(
Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf(
bit_width,
byte_buffer,
mem::size_of::<i32>(),
Expand All @@ -77,7 +77,7 @@ impl LevelEncoder {
// Here we set full byte buffer without adjusting for num_buffered_values,
// because byte buffer will already be allocated with size from
// `max_buffer_size()` method.
LevelEncoder::BIT_PACKED(
LevelEncoder::BitPacked(
bit_width,
BitWriter::new_from_buf(byte_buffer, 0),
)
Expand All @@ -90,7 +90,7 @@ impl LevelEncoder {
/// repetition and definition levels.
pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelEncoder::RLE_V2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
}

/// Put/encode levels vector into this level encoder.
Expand All @@ -103,8 +103,7 @@ impl LevelEncoder {
pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
let mut num_encoded = 0;
match *self {
LevelEncoder::RLE(ref mut encoder)
| LevelEncoder::RLE_V2(ref mut encoder) => {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => {
for value in buffer {
if !encoder.put(*value as u64)? {
return Err(general_err!("RLE buffer is full"));
Expand All @@ -113,7 +112,7 @@ impl LevelEncoder {
}
encoder.flush()?;
}
LevelEncoder::BIT_PACKED(bit_width, ref mut encoder) => {
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
if !encoder.put_value(*value as u64, bit_width as usize) {
return Err(general_err!("Not enough bytes left"));
Expand All @@ -131,7 +130,7 @@ impl LevelEncoder {
#[inline]
pub fn consume(self) -> Result<Vec<u8>> {
match self {
LevelEncoder::RLE(encoder) => {
LevelEncoder::Rle(encoder) => {
let mut encoded_data = encoder.consume()?;
// Account for the buffer offset
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
Expand All @@ -140,8 +139,8 @@ impl LevelEncoder {
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
Ok(encoded_data)
}
LevelEncoder::RLE_V2(encoder) => encoder.consume(),
LevelEncoder::BIT_PACKED(_, encoder) => Ok(encoder.consume()),
LevelEncoder::RleV2(encoder) => encoder.consume(),
LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()),
}
}
}
Expand All @@ -150,9 +149,9 @@ impl LevelEncoder {
/// Currently only supports RLE and BIT_PACKED encoding for Data Page v1 and
/// RLE for Data Page v2.
pub enum LevelDecoder {
RLE(Option<usize>, RleDecoder),
RLE_V2(Option<usize>, RleDecoder),
BIT_PACKED(Option<usize>, u8, BitReader),
Rle(Option<usize>, RleDecoder),
RleV2(Option<usize>, RleDecoder),
BitPacked(Option<usize>, u8, BitReader),
}

impl LevelDecoder {
Expand All @@ -166,9 +165,9 @@ impl LevelDecoder {
pub fn v1(encoding: Encoding, max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelDecoder::RLE(None, RleDecoder::new(bit_width)),
Encoding::RLE => LevelDecoder::Rle(None, RleDecoder::new(bit_width)),
Encoding::BIT_PACKED => {
LevelDecoder::BIT_PACKED(None, bit_width, BitReader::from(Vec::new()))
LevelDecoder::BitPacked(None, bit_width, BitReader::from(Vec::new()))
}
_ => panic!("Unsupported encoding type {}", encoding),
}
Expand All @@ -180,7 +179,7 @@ impl LevelDecoder {
/// To set data for this decoder, use `set_data_range` method.
pub fn v2(max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelDecoder::RLE_V2(None, RleDecoder::new(bit_width))
LevelDecoder::RleV2(None, RleDecoder::new(bit_width))
}

/// Sets data for this level decoder, and returns total number of bytes set.
Expand All @@ -194,14 +193,14 @@ impl LevelDecoder {
#[inline]
pub fn set_data(&mut self, num_buffered_values: usize, data: ByteBufferPtr) -> usize {
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder) => {
LevelDecoder::Rle(ref mut num_values, ref mut decoder) => {
*num_values = Some(num_buffered_values);
let i32_size = mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize;
decoder.set_data(data.range(i32_size, data_size));
i32_size + data_size
}
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
LevelDecoder::BitPacked(ref mut num_values, bit_width, ref mut decoder) => {
*num_values = Some(num_buffered_values);
// Set appropriate number of bytes: if max size is larger than buffer -
// set full buffer
Expand All @@ -227,7 +226,7 @@ impl LevelDecoder {
len: usize,
) -> usize {
match *self {
LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
LevelDecoder::RleV2(ref mut num_values, ref mut decoder) => {
decoder.set_data(data.range(start, len));
*num_values = Some(num_buffered_values);
len
Expand All @@ -242,9 +241,9 @@ impl LevelDecoder {
#[inline]
pub fn is_data_set(&self) -> bool {
match self {
LevelDecoder::RLE(ref num_values, _) => num_values.is_some(),
LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(),
LevelDecoder::Rle(ref num_values, _) => num_values.is_some(),
LevelDecoder::RleV2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BitPacked(ref num_values, ..) => num_values.is_some(),
}
}

Expand All @@ -255,15 +254,15 @@ impl LevelDecoder {
pub fn get(&mut self, buffer: &mut [i16]) -> Result<usize> {
assert!(self.is_data_set(), "No data set for decoding");
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder)
| LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
LevelDecoder::Rle(ref mut num_values, ref mut decoder)
| LevelDecoder::RleV2(ref mut num_values, ref mut decoder) => {
// Max length we can read
let len = cmp::min(num_values.unwrap(), buffer.len());
let values_read = decoder.get_batch::<i16>(&mut buffer[0..len])?;
*num_values = num_values.map(|len| len - values_read);
Ok(values_read)
}
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
LevelDecoder::BitPacked(ref mut num_values, bit_width, ref mut decoder) => {
// When extracting values from bit reader, it might return more values
// than left because of padding to a full byte, we use
// num_values to track precise number of values.
Expand Down
15 changes: 5 additions & 10 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,9 @@ mod tests {
#[test]
fn test_rle_specific_sequences() {
let mut expected_buffer = Vec::new();
let mut values = Vec::new();
for _ in 0..50 {
values.push(0);
}
for _ in 0..50 {
values.push(1);
}
let mut values = vec![0; 50];
values.resize(100, 1);

expected_buffer.push(50 << 1);
expected_buffer.push(0);
expected_buffer.push(50 << 1);
Expand All @@ -696,9 +692,8 @@ mod tests {
}
let num_groups = bit_util::ceil(100, 8) as u8;
expected_buffer.push(((num_groups << 1) as u8) | 1);
for _ in 1..(100 / 8) + 1 {
expected_buffer.push(0b10101010);
}
expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);

// For the last 4 0 and 1's, padded with 0.
expected_buffer.push(0b00001010);
validate_rle(
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,13 +1066,13 @@ mod tests {
rows,
"row count in metadata not equal to number of rows written"
);
for i in 0..reader.num_row_groups() {
for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
let res = iter
.map(|elem| elem.get_int(0).unwrap())
.collect::<Vec<i32>>();
assert_eq!(res, data[i]);
assert_eq!(res, *item);
}
}

Expand Down Expand Up @@ -1153,13 +1153,13 @@ mod tests {
rows,
"row count in metadata not equal to number of rows written"
);
for i in 0..reader.num_row_groups() {
for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
let res = iter
.map(|elem| elem.get_int(0).unwrap())
.collect::<Vec<i32>>();
assert_eq!(res, data[i]);
assert_eq!(res, *item);
}
}
}