Skip to content

Commit

Permalink
Fix some clippy lints in parquet crate, rename LevelEncoder variant…
Browse files Browse the repository at this point in the history
…s to conform to Rust standards (#1273)

* disallow vec_init_then_push

Signed-off-by: remzi <13716567376yh@gmail.com>

* disallow upper case acronyms

Signed-off-by: remzi <13716567376yh@gmail.com>

* disallow transmute ptr to ptr

Signed-off-by: remzi <13716567376yh@gmail.com>

* disallow same item push

Signed-off-by: remzi <13716567376yh@gmail.com>

* disallow approx constant

Signed-off-by: remzi <13716567376yh@gmail.com>

* disallow cast ptr alignment

Signed-off-by: remzi <13716567376yh@gmail.com>

* disallow float cmp

Signed-off-by: remzi <13716567376yh@gmail.com>

* check float equality with abs

Signed-off-by: remzi <13716567376yh@gmail.com>

* check incomplete features

Signed-off-by: remzi <13716567376yh@gmail.com>

* check single char names

Signed-off-by: remzi <13716567376yh@gmail.com>

* check needless range loop

Signed-off-by: remzi <13716567376yh@gmail.com>

* fix more vec_init_then_push lint, especially in macro

Signed-off-by: remzi <13716567376yh@gmail.com>

* fix more float dquality without abs

Signed-off-by: remzi <13716567376yh@gmail.com>

* fix more vec push same items

Signed-off-by: remzi <13716567376yh@gmail.com>

* fix more needless range loop

Signed-off-by: remzi <13716567376yh@gmail.com>
  • Loading branch information
HaoYang670 committed Feb 8, 2022
1 parent 936ed5e commit dbc1752
Show file tree
Hide file tree
Showing 13 changed files with 89 additions and 135 deletions.
9 changes: 7 additions & 2 deletions parquet/src/arrow/record_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,13 @@ where
let mut records_read = 0;
let mut end_of_last_record = self.num_values;

for current in self.num_values..self.values_written {
if buf[current] == 0 && current != self.num_values {
for (current, item) in buf
.iter()
.enumerate()
.take(self.values_written)
.skip(self.num_values)
{
if *item == 0 && current != self.num_values {
records_read += 1;
end_of_last_record = current;

Expand Down
7 changes: 4 additions & 3 deletions parquet/src/arrow/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1955,9 +1955,10 @@ mod tests {
";
let parquet_group_type = parse_message_type(message_type).unwrap();

let mut key_value_metadata: Vec<KeyValue> = Vec::new();
key_value_metadata.push(KeyValue::new("foo".to_owned(), Some("bar".to_owned())));
key_value_metadata.push(KeyValue::new("baz".to_owned(), None));
let key_value_metadata = vec![
KeyValue::new("foo".to_owned(), Some("bar".to_owned())),
KeyValue::new("baz".to_owned(), None)
];

let mut expected_metadata: HashMap<String, String> = HashMap::new();
expected_metadata.insert("foo".to_owned(), "bar".to_owned());
Expand Down
16 changes: 5 additions & 11 deletions parquet/src/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,17 +848,12 @@ pub(crate) mod private {
decoder.start += bytes_to_decode;

let mut pos = 0; // position in byte array
for i in 0..num_values {
for item in buffer.iter_mut().take(num_values) {
let elem0 = byteorder::LittleEndian::read_u32(&bytes[pos..pos + 4]);
let elem1 = byteorder::LittleEndian::read_u32(&bytes[pos + 4..pos + 8]);
let elem2 = byteorder::LittleEndian::read_u32(&bytes[pos + 8..pos + 12]);

buffer[i]
.as_mut_any()
.downcast_mut::<Self>()
.unwrap()
.set_data(elem0, elem1, elem2);

item.set_data(elem0, elem1, elem2);
pos += 12;
}
decoder.num_values -= num_values;
Expand Down Expand Up @@ -1002,16 +997,15 @@ pub(crate) mod private {
.as_mut()
.expect("set_data should have been called");
let num_values = std::cmp::min(buffer.len(), decoder.num_values);
for i in 0..num_values {

for item in buffer.iter_mut().take(num_values) {
let len = decoder.type_length as usize;

if data.len() < decoder.start + len {
return Err(eof_err!("Not enough bytes to decode"));
}

let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap();

val.set_data(data.range(decoder.start, len));
item.set_data(data.range(decoder.start, len));
decoder.start += len;
}
decoder.num_values -= num_values;
Expand Down
16 changes: 8 additions & 8 deletions parquet/src/encodings/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,11 +729,11 @@ impl<T: DataType> Decoder<T> for DeltaLengthByteArrayDecoder<T> {

let data = self.data.as_ref().unwrap();
let num_values = cmp::min(buffer.len(), self.num_values);
for i in 0..num_values {

for item in buffer.iter_mut().take(num_values) {
let len = self.lengths[self.current_idx] as usize;

buffer[i]
.as_mut_any()
item.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data.range(self.offset, len));
Expand Down Expand Up @@ -836,7 +836,7 @@ impl<'m, T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => {
let num_values = cmp::min(buffer.len(), self.num_values);
let mut v: [ByteArray; 1] = [ByteArray::new(); 1];
for i in 0..num_values {
for item in buffer.iter_mut().take(num_values) {
// Process suffix
// TODO: this is awkward - maybe we should add a non-vectorized API?
let suffix_decoder = self.suffix_decoder.as_mut().expect("decoder not initialized");
Expand All @@ -854,12 +854,12 @@ impl<'m, T: DataType> Decoder<T> for DeltaByteArrayDecoder<T> {
let data = ByteBufferPtr::new(result.clone());

match ty {
Type::BYTE_ARRAY => buffer[i]
Type::BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<ByteArray>()
.unwrap()
.set_data(data),
Type::FIXED_LEN_BYTE_ARRAY => buffer[i]
Type::FIXED_LEN_BYTE_ARRAY => item
.as_mut_any()
.downcast_mut::<FixedLenByteArray>()
.unwrap()
Expand Down Expand Up @@ -1448,11 +1448,11 @@ mod tests {
#[allow(clippy::wrong_self_convention)]
fn to_byte_array(data: &[bool]) -> Vec<u8> {
let mut v = vec![];
for i in 0..data.len() {
for (i, item) in data.iter().enumerate() {
if i % 8 == 0 {
v.push(0);
}
if data[i] {
if *item {
set_array_bit(&mut v[..], i);
}
}
Expand Down
16 changes: 9 additions & 7 deletions parquet/src/encodings/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ pub trait Encoder<T: DataType> {
let num_values = values.len();
let mut buffer = Vec::with_capacity(num_values);
// TODO: this is pretty inefficient. Revisit in future.
for i in 0..num_values {
for (i, item) in values.iter().enumerate().take(num_values) {
if bit_util::get_bit(valid_bits, i) {
buffer.push(values[i].clone());
buffer.push(item.clone());
}
}
self.put(&buffer[..])?;
Expand Down Expand Up @@ -1128,11 +1128,13 @@ mod tests {
let mut decoder =
create_test_decoder::<ByteArrayType>(0, Encoding::DELTA_BYTE_ARRAY);

let mut input = vec![];
input.push(ByteArray::from("aa"));
input.push(ByteArray::from("aaa"));
input.push(ByteArray::from("aa"));
input.push(ByteArray::from("aaa"));
let input = vec![
ByteArray::from("aa"),
ByteArray::from("aaa"),
ByteArray::from("aa"),
ByteArray::from("aaa"),
];

let mut output = vec![ByteArray::default(); input.len()];

let mut result =
Expand Down
55 changes: 27 additions & 28 deletions parquet/src/encodings/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ pub fn max_buffer_size(
}

/// Encoder for definition/repetition levels.
/// Currently only supports RLE and BIT_PACKED (dev/null) encoding, including v2.
/// Currently only supports Rle and BitPacked (dev/null) encoding, including v2.
pub enum LevelEncoder {
RLE(RleEncoder),
RLE_V2(RleEncoder),
BIT_PACKED(u8, BitWriter),
Rle(RleEncoder),
RleV2(RleEncoder),
BitPacked(u8, BitWriter),
}

impl LevelEncoder {
Expand All @@ -68,7 +68,7 @@ impl LevelEncoder {
pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelEncoder::RLE(RleEncoder::new_from_buf(
Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf(
bit_width,
byte_buffer,
mem::size_of::<i32>(),
Expand All @@ -77,7 +77,7 @@ impl LevelEncoder {
// Here we set full byte buffer without adjusting for num_buffered_values,
// because byte buffer will already be allocated with size from
// `max_buffer_size()` method.
LevelEncoder::BIT_PACKED(
LevelEncoder::BitPacked(
bit_width,
BitWriter::new_from_buf(byte_buffer, 0),
)
Expand All @@ -90,7 +90,7 @@ impl LevelEncoder {
/// repetition and definition levels.
pub fn v2(max_level: i16, byte_buffer: Vec<u8>) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelEncoder::RLE_V2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0))
}

/// Put/encode levels vector into this level encoder.
Expand All @@ -103,8 +103,7 @@ impl LevelEncoder {
pub fn put(&mut self, buffer: &[i16]) -> Result<usize> {
let mut num_encoded = 0;
match *self {
LevelEncoder::RLE(ref mut encoder)
| LevelEncoder::RLE_V2(ref mut encoder) => {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => {
for value in buffer {
if !encoder.put(*value as u64)? {
return Err(general_err!("RLE buffer is full"));
Expand All @@ -113,7 +112,7 @@ impl LevelEncoder {
}
encoder.flush()?;
}
LevelEncoder::BIT_PACKED(bit_width, ref mut encoder) => {
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
if !encoder.put_value(*value as u64, bit_width as usize) {
return Err(general_err!("Not enough bytes left"));
Expand All @@ -131,7 +130,7 @@ impl LevelEncoder {
#[inline]
pub fn consume(self) -> Result<Vec<u8>> {
match self {
LevelEncoder::RLE(encoder) => {
LevelEncoder::Rle(encoder) => {
let mut encoded_data = encoder.consume()?;
// Account for the buffer offset
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
Expand All @@ -140,8 +139,8 @@ impl LevelEncoder {
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
Ok(encoded_data)
}
LevelEncoder::RLE_V2(encoder) => encoder.consume(),
LevelEncoder::BIT_PACKED(_, encoder) => Ok(encoder.consume()),
LevelEncoder::RleV2(encoder) => encoder.consume(),
LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()),
}
}
}
Expand All @@ -150,9 +149,9 @@ impl LevelEncoder {
/// Currently only supports RLE and BIT_PACKED encoding for Data Page v1 and
/// RLE for Data Page v2.
pub enum LevelDecoder {
RLE(Option<usize>, RleDecoder),
RLE_V2(Option<usize>, RleDecoder),
BIT_PACKED(Option<usize>, u8, BitReader),
Rle(Option<usize>, RleDecoder),
RleV2(Option<usize>, RleDecoder),
BitPacked(Option<usize>, u8, BitReader),
}

impl LevelDecoder {
Expand All @@ -166,9 +165,9 @@ impl LevelDecoder {
pub fn v1(encoding: Encoding, max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
match encoding {
Encoding::RLE => LevelDecoder::RLE(None, RleDecoder::new(bit_width)),
Encoding::RLE => LevelDecoder::Rle(None, RleDecoder::new(bit_width)),
Encoding::BIT_PACKED => {
LevelDecoder::BIT_PACKED(None, bit_width, BitReader::from(Vec::new()))
LevelDecoder::BitPacked(None, bit_width, BitReader::from(Vec::new()))
}
_ => panic!("Unsupported encoding type {}", encoding),
}
Expand All @@ -180,7 +179,7 @@ impl LevelDecoder {
/// To set data for this decoder, use `set_data_range` method.
pub fn v2(max_level: i16) -> Self {
let bit_width = log2(max_level as u64 + 1) as u8;
LevelDecoder::RLE_V2(None, RleDecoder::new(bit_width))
LevelDecoder::RleV2(None, RleDecoder::new(bit_width))
}

/// Sets data for this level decoder, and returns total number of bytes set.
Expand All @@ -194,14 +193,14 @@ impl LevelDecoder {
#[inline]
pub fn set_data(&mut self, num_buffered_values: usize, data: ByteBufferPtr) -> usize {
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder) => {
LevelDecoder::Rle(ref mut num_values, ref mut decoder) => {
*num_values = Some(num_buffered_values);
let i32_size = mem::size_of::<i32>();
let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize;
decoder.set_data(data.range(i32_size, data_size));
i32_size + data_size
}
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
LevelDecoder::BitPacked(ref mut num_values, bit_width, ref mut decoder) => {
*num_values = Some(num_buffered_values);
// Set appropriate number of bytes: if max size is larger than buffer -
// set full buffer
Expand All @@ -227,7 +226,7 @@ impl LevelDecoder {
len: usize,
) -> usize {
match *self {
LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
LevelDecoder::RleV2(ref mut num_values, ref mut decoder) => {
decoder.set_data(data.range(start, len));
*num_values = Some(num_buffered_values);
len
Expand All @@ -242,9 +241,9 @@ impl LevelDecoder {
#[inline]
pub fn is_data_set(&self) -> bool {
match self {
LevelDecoder::RLE(ref num_values, _) => num_values.is_some(),
LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(),
LevelDecoder::Rle(ref num_values, _) => num_values.is_some(),
LevelDecoder::RleV2(ref num_values, _) => num_values.is_some(),
LevelDecoder::BitPacked(ref num_values, ..) => num_values.is_some(),
}
}

Expand All @@ -255,15 +254,15 @@ impl LevelDecoder {
pub fn get(&mut self, buffer: &mut [i16]) -> Result<usize> {
assert!(self.is_data_set(), "No data set for decoding");
match *self {
LevelDecoder::RLE(ref mut num_values, ref mut decoder)
| LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => {
LevelDecoder::Rle(ref mut num_values, ref mut decoder)
| LevelDecoder::RleV2(ref mut num_values, ref mut decoder) => {
// Max length we can read
let len = cmp::min(num_values.unwrap(), buffer.len());
let values_read = decoder.get_batch::<i16>(&mut buffer[0..len])?;
*num_values = num_values.map(|len| len - values_read);
Ok(values_read)
}
LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => {
LevelDecoder::BitPacked(ref mut num_values, bit_width, ref mut decoder) => {
// When extracting values from bit reader, it might return more values
// than left because of padding to a full byte, we use
// num_values to track precise number of values.
Expand Down
15 changes: 5 additions & 10 deletions parquet/src/encodings/rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -664,13 +664,9 @@ mod tests {
#[test]
fn test_rle_specific_sequences() {
let mut expected_buffer = Vec::new();
let mut values = Vec::new();
for _ in 0..50 {
values.push(0);
}
for _ in 0..50 {
values.push(1);
}
let mut values = vec![0; 50];
values.resize(100, 1);

expected_buffer.push(50 << 1);
expected_buffer.push(0);
expected_buffer.push(50 << 1);
Expand All @@ -696,9 +692,8 @@ mod tests {
}
let num_groups = bit_util::ceil(100, 8) as u8;
expected_buffer.push(((num_groups << 1) as u8) | 1);
for _ in 1..(100 / 8) + 1 {
expected_buffer.push(0b10101010);
}
expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010);

// For the last 4 0 and 1's, padded with 0.
expected_buffer.push(0b00001010);
validate_rle(
Expand Down
8 changes: 4 additions & 4 deletions parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,13 +1066,13 @@ mod tests {
rows,
"row count in metadata not equal to number of rows written"
);
for i in 0..reader.num_row_groups() {
for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
let res = iter
.map(|elem| elem.get_int(0).unwrap())
.collect::<Vec<i32>>();
assert_eq!(res, data[i]);
assert_eq!(res, *item);
}
}

Expand Down Expand Up @@ -1153,13 +1153,13 @@ mod tests {
rows,
"row count in metadata not equal to number of rows written"
);
for i in 0..reader.num_row_groups() {
for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) {
let row_group_reader = reader.get_row_group(i).unwrap();
let iter = row_group_reader.get_row_iter(None).unwrap();
let res = iter
.map(|elem| elem.get_int(0).unwrap())
.collect::<Vec<i32>>();
assert_eq!(res, data[i]);
assert_eq!(res, *item);
}
}
}

0 comments on commit dbc1752

Please sign in to comment.