diff --git a/parquet/src/arrow/record_reader.rs b/parquet/src/arrow/record_reader.rs index 2782adf7273..ed266c4cfc9 100644 --- a/parquet/src/arrow/record_reader.rs +++ b/parquet/src/arrow/record_reader.rs @@ -312,8 +312,13 @@ where let mut records_read = 0; let mut end_of_last_record = self.num_values; - for current in self.num_values..self.values_written { - if buf[current] == 0 && current != self.num_values { + for (current, item) in buf + .iter() + .enumerate() + .take(self.values_written) + .skip(self.num_values) + { + if *item == 0 && current != self.num_values { records_read += 1; end_of_last_record = current; diff --git a/parquet/src/arrow/schema.rs b/parquet/src/arrow/schema.rs index 198763b2c3d..72061e8ad5c 100644 --- a/parquet/src/arrow/schema.rs +++ b/parquet/src/arrow/schema.rs @@ -1955,9 +1955,10 @@ mod tests { "; let parquet_group_type = parse_message_type(message_type).unwrap(); - let mut key_value_metadata: Vec = Vec::new(); - key_value_metadata.push(KeyValue::new("foo".to_owned(), Some("bar".to_owned()))); - key_value_metadata.push(KeyValue::new("baz".to_owned(), None)); + let key_value_metadata = vec![ + KeyValue::new("foo".to_owned(), Some("bar".to_owned())), + KeyValue::new("baz".to_owned(), None) + ]; let mut expected_metadata: HashMap = HashMap::new(); expected_metadata.insert("foo".to_owned(), "bar".to_owned()); diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs index fde46d5b36b..ae1e0136537 100644 --- a/parquet/src/data_type.rs +++ b/parquet/src/data_type.rs @@ -848,17 +848,12 @@ pub(crate) mod private { decoder.start += bytes_to_decode; let mut pos = 0; // position in byte array - for i in 0..num_values { + for item in buffer.iter_mut().take(num_values) { let elem0 = byteorder::LittleEndian::read_u32(&bytes[pos..pos + 4]); let elem1 = byteorder::LittleEndian::read_u32(&bytes[pos + 4..pos + 8]); let elem2 = byteorder::LittleEndian::read_u32(&bytes[pos + 8..pos + 12]); - buffer[i] - .as_mut_any() - .downcast_mut::() - .unwrap() - .set_data(elem0, elem1, elem2); - + item.set_data(elem0, elem1, elem2); pos += 12; } decoder.num_values -= num_values; @@ -1002,16 +997,15 @@ pub(crate) mod private { .as_mut() .expect("set_data should have been called"); let num_values = std::cmp::min(buffer.len(), decoder.num_values); - for i in 0..num_values { + + for item in buffer.iter_mut().take(num_values) { let len = decoder.type_length as usize; if data.len() < decoder.start + len { return Err(eof_err!("Not enough bytes to decode")); } - let val: &mut Self = buffer[i].as_mut_any().downcast_mut().unwrap(); - - val.set_data(data.range(decoder.start, len)); + item.set_data(data.range(decoder.start, len)); decoder.start += len; } decoder.num_values -= num_values; diff --git a/parquet/src/encodings/decoding.rs b/parquet/src/encodings/decoding.rs index f044dd244d2..4374448f424 100644 --- a/parquet/src/encodings/decoding.rs +++ b/parquet/src/encodings/decoding.rs @@ -729,11 +729,11 @@ impl Decoder for DeltaLengthByteArrayDecoder { let data = self.data.as_ref().unwrap(); let num_values = cmp::min(buffer.len(), self.num_values); - for i in 0..num_values { + + for item in buffer.iter_mut().take(num_values) { let len = self.lengths[self.current_idx] as usize; - buffer[i] - .as_mut_any() + item.as_mut_any() .downcast_mut::() .unwrap() .set_data(data.range(self.offset, len)); @@ -836,7 +836,7 @@ impl<'m, T: DataType> Decoder for DeltaByteArrayDecoder { ty @ Type::BYTE_ARRAY | ty @ Type::FIXED_LEN_BYTE_ARRAY => { let num_values = cmp::min(buffer.len(), self.num_values); let mut v: [ByteArray; 1] = [ByteArray::new(); 1]; - for i in 0..num_values { + for item in buffer.iter_mut().take(num_values) { // Process suffix // TODO: this is awkward - maybe we should add a non-vectorized API? let suffix_decoder = self.suffix_decoder.as_mut().expect("decoder not initialized"); @@ -854,12 +854,12 @@ impl<'m, T: DataType> Decoder for DeltaByteArrayDecoder { let data = ByteBufferPtr::new(result.clone()); match ty { - Type::BYTE_ARRAY => buffer[i] + Type::BYTE_ARRAY => item .as_mut_any() .downcast_mut::() .unwrap() .set_data(data), - Type::FIXED_LEN_BYTE_ARRAY => buffer[i] + Type::FIXED_LEN_BYTE_ARRAY => item .as_mut_any() .downcast_mut::() .unwrap() @@ -1448,11 +1448,11 @@ mod tests { #[allow(clippy::wrong_self_convention)] fn to_byte_array(data: &[bool]) -> Vec { let mut v = vec![]; - for i in 0..data.len() { + for (i, item) in data.iter().enumerate() { if i % 8 == 0 { v.push(0); } - if data[i] { + if *item { set_array_bit(&mut v[..], i); } } diff --git a/parquet/src/encodings/encoding.rs b/parquet/src/encodings/encoding.rs index 4d67e65becb..b0893434a2c 100644 --- a/parquet/src/encodings/encoding.rs +++ b/parquet/src/encodings/encoding.rs @@ -50,9 +50,9 @@ pub trait Encoder { let num_values = values.len(); let mut buffer = Vec::with_capacity(num_values); // TODO: this is pretty inefficient. Revisit in future. - for i in 0..num_values { + for (i, item) in values.iter().enumerate().take(num_values) { if bit_util::get_bit(valid_bits, i) { - buffer.push(values[i].clone()); + buffer.push(item.clone()); } } self.put(&buffer[..])?; @@ -1128,11 +1128,13 @@ mod tests { let mut decoder = create_test_decoder::(0, Encoding::DELTA_BYTE_ARRAY); - let mut input = vec![]; - input.push(ByteArray::from("aa")); - input.push(ByteArray::from("aaa")); - input.push(ByteArray::from("aa")); - input.push(ByteArray::from("aaa")); + let input = vec![ + ByteArray::from("aa"), + ByteArray::from("aaa"), + ByteArray::from("aa"), + ByteArray::from("aaa"), + ]; + let mut output = vec![ByteArray::default(); input.len()]; let mut result = diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs index b82d2959bec..deabbd44034 100644 --- a/parquet/src/encodings/levels.rs +++ b/parquet/src/encodings/levels.rs @@ -50,11 +50,11 @@ pub fn max_buffer_size( } /// Encoder for definition/repetition levels. -/// Currently only supports RLE and BIT_PACKED (dev/null) encoding, including v2. +/// Currently only supports Rle and BitPacked (dev/null) encoding, including v2. pub enum LevelEncoder { - RLE(RleEncoder), - RLE_V2(RleEncoder), - BIT_PACKED(u8, BitWriter), + Rle(RleEncoder), + RleV2(RleEncoder), + BitPacked(u8, BitWriter), } impl LevelEncoder { @@ -68,7 +68,7 @@ impl LevelEncoder { pub fn v1(encoding: Encoding, max_level: i16, byte_buffer: Vec) -> Self { let bit_width = log2(max_level as u64 + 1) as u8; match encoding { - Encoding::RLE => LevelEncoder::RLE(RleEncoder::new_from_buf( + Encoding::RLE => LevelEncoder::Rle(RleEncoder::new_from_buf( bit_width, byte_buffer, mem::size_of::(), @@ -77,7 +77,7 @@ impl LevelEncoder { // Here we set full byte buffer without adjusting for num_buffered_values, // because byte buffer will already be allocated with size from // `max_buffer_size()` method. - LevelEncoder::BIT_PACKED( + LevelEncoder::BitPacked( bit_width, BitWriter::new_from_buf(byte_buffer, 0), ) @@ -90,7 +90,7 @@ impl LevelEncoder { /// repetition and definition levels. pub fn v2(max_level: i16, byte_buffer: Vec) -> Self { let bit_width = log2(max_level as u64 + 1) as u8; - LevelEncoder::RLE_V2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0)) + LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, byte_buffer, 0)) } /// Put/encode levels vector into this level encoder. @@ -103,8 +103,7 @@ impl LevelEncoder { pub fn put(&mut self, buffer: &[i16]) -> Result { let mut num_encoded = 0; match *self { - LevelEncoder::RLE(ref mut encoder) - | LevelEncoder::RLE_V2(ref mut encoder) => { + LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => { for value in buffer { if !encoder.put(*value as u64)? { return Err(general_err!("RLE buffer is full")); @@ -113,7 +112,7 @@ impl LevelEncoder { } encoder.flush()?; } - LevelEncoder::BIT_PACKED(bit_width, ref mut encoder) => { + LevelEncoder::BitPacked(bit_width, ref mut encoder) => { for value in buffer { if !encoder.put_value(*value as u64, bit_width as usize) { return Err(general_err!("Not enough bytes left")); @@ -131,7 +130,7 @@ impl LevelEncoder { #[inline] pub fn consume(self) -> Result> { match self { - LevelEncoder::RLE(encoder) => { + LevelEncoder::Rle(encoder) => { let mut encoded_data = encoder.consume()?; // Account for the buffer offset let encoded_len = encoded_data.len() - mem::size_of::(); @@ -140,8 +139,8 @@ impl LevelEncoder { encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes); Ok(encoded_data) } - LevelEncoder::RLE_V2(encoder) => encoder.consume(), - LevelEncoder::BIT_PACKED(_, encoder) => Ok(encoder.consume()), + LevelEncoder::RleV2(encoder) => encoder.consume(), + LevelEncoder::BitPacked(_, encoder) => Ok(encoder.consume()), } } } @@ -150,9 +149,9 @@ impl LevelEncoder { /// Currently only supports RLE and BIT_PACKED encoding for Data Page v1 and /// RLE for Data Page v2. pub enum LevelDecoder { - RLE(Option, RleDecoder), - RLE_V2(Option, RleDecoder), - BIT_PACKED(Option, u8, BitReader), + Rle(Option, RleDecoder), + RleV2(Option, RleDecoder), + BitPacked(Option, u8, BitReader), } impl LevelDecoder { @@ -166,9 +165,9 @@ impl LevelDecoder { pub fn v1(encoding: Encoding, max_level: i16) -> Self { let bit_width = log2(max_level as u64 + 1) as u8; match encoding { - Encoding::RLE => LevelDecoder::RLE(None, RleDecoder::new(bit_width)), + Encoding::RLE => LevelDecoder::Rle(None, RleDecoder::new(bit_width)), Encoding::BIT_PACKED => { - LevelDecoder::BIT_PACKED(None, bit_width, BitReader::from(Vec::new())) + LevelDecoder::BitPacked(None, bit_width, BitReader::from(Vec::new())) } _ => panic!("Unsupported encoding type {}", encoding), } @@ -180,7 +179,7 @@ impl LevelDecoder { /// To set data for this decoder, use `set_data_range` method. pub fn v2(max_level: i16) -> Self { let bit_width = log2(max_level as u64 + 1) as u8; - LevelDecoder::RLE_V2(None, RleDecoder::new(bit_width)) + LevelDecoder::RleV2(None, RleDecoder::new(bit_width)) } /// Sets data for this level decoder, and returns total number of bytes set. @@ -194,14 +193,14 @@ impl LevelDecoder { #[inline] pub fn set_data(&mut self, num_buffered_values: usize, data: ByteBufferPtr) -> usize { match *self { - LevelDecoder::RLE(ref mut num_values, ref mut decoder) => { + LevelDecoder::Rle(ref mut num_values, ref mut decoder) => { *num_values = Some(num_buffered_values); let i32_size = mem::size_of::(); let data_size = read_num_bytes!(i32, i32_size, data.as_ref()) as usize; decoder.set_data(data.range(i32_size, data_size)); i32_size + data_size } - LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => { + LevelDecoder::BitPacked(ref mut num_values, bit_width, ref mut decoder) => { *num_values = Some(num_buffered_values); // Set appropriate number of bytes: if max size is larger than buffer - // set full buffer @@ -227,7 +226,7 @@ impl LevelDecoder { len: usize, ) -> usize { match *self { - LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => { + LevelDecoder::RleV2(ref mut num_values, ref mut decoder) => { decoder.set_data(data.range(start, len)); *num_values = Some(num_buffered_values); len @@ -242,9 +241,9 @@ impl LevelDecoder { #[inline] pub fn is_data_set(&self) -> bool { match self { - LevelDecoder::RLE(ref num_values, _) => num_values.is_some(), - LevelDecoder::RLE_V2(ref num_values, _) => num_values.is_some(), - LevelDecoder::BIT_PACKED(ref num_values, ..) => num_values.is_some(), + LevelDecoder::Rle(ref num_values, _) => num_values.is_some(), + LevelDecoder::RleV2(ref num_values, _) => num_values.is_some(), + LevelDecoder::BitPacked(ref num_values, ..) => num_values.is_some(), } } @@ -255,15 +254,15 @@ impl LevelDecoder { pub fn get(&mut self, buffer: &mut [i16]) -> Result { assert!(self.is_data_set(), "No data set for decoding"); match *self { - LevelDecoder::RLE(ref mut num_values, ref mut decoder) - | LevelDecoder::RLE_V2(ref mut num_values, ref mut decoder) => { + LevelDecoder::Rle(ref mut num_values, ref mut decoder) + | LevelDecoder::RleV2(ref mut num_values, ref mut decoder) => { // Max length we can read let len = cmp::min(num_values.unwrap(), buffer.len()); let values_read = decoder.get_batch::(&mut buffer[0..len])?; *num_values = num_values.map(|len| len - values_read); Ok(values_read) } - LevelDecoder::BIT_PACKED(ref mut num_values, bit_width, ref mut decoder) => { + LevelDecoder::BitPacked(ref mut num_values, bit_width, ref mut decoder) => { // When extracting values from bit reader, it might return more values // than left because of padding to a full byte, we use // num_values to track precise number of values. diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs index f61215f0a5b..711f3a61557 100644 --- a/parquet/src/encodings/rle.rs +++ b/parquet/src/encodings/rle.rs @@ -664,13 +664,9 @@ mod tests { #[test] fn test_rle_specific_sequences() { let mut expected_buffer = Vec::new(); - let mut values = Vec::new(); - for _ in 0..50 { - values.push(0); - } - for _ in 0..50 { - values.push(1); - } + let mut values = vec![0; 50]; + values.resize(100, 1); + expected_buffer.push(50 << 1); expected_buffer.push(0); expected_buffer.push(50 << 1); @@ -696,9 +692,8 @@ mod tests { } let num_groups = bit_util::ceil(100, 8) as u8; expected_buffer.push(((num_groups << 1) as u8) | 1); - for _ in 1..(100 / 8) + 1 { - expected_buffer.push(0b10101010); - } + expected_buffer.resize(expected_buffer.len() + 100 / 8, 0b10101010); + // For the last 4 0 and 1's, padded with 0. expected_buffer.push(0b00001010); validate_rle( diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 86e4d8d7379..94a983fe3d6 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1066,13 +1066,13 @@ mod tests { rows, "row count in metadata not equal to number of rows written" ); - for i in 0..reader.num_row_groups() { + for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) { let row_group_reader = reader.get_row_group(i).unwrap(); let iter = row_group_reader.get_row_iter(None).unwrap(); let res = iter .map(|elem| elem.get_int(0).unwrap()) .collect::>(); - assert_eq!(res, data[i]); + assert_eq!(res, *item); } } @@ -1153,13 +1153,13 @@ mod tests { rows, "row count in metadata not equal to number of rows written" ); - for i in 0..reader.num_row_groups() { + for (i, item) in data.iter().enumerate().take(reader.num_row_groups()) { let row_group_reader = reader.get_row_group(i).unwrap(); let iter = row_group_reader.get_row_iter(None).unwrap(); let res = iter .map(|elem| elem.get_int(0).unwrap()) .collect::>(); - assert_eq!(res, data[i]); + assert_eq!(res, *item); } } } diff --git a/parquet/src/lib.rs b/parquet/src/lib.rs index b50fab41184..e86b9e65917 100644 --- a/parquet/src/lib.rs +++ b/parquet/src/lib.rs @@ -30,24 +30,13 @@ //! //! 3. [arrow::async_reader] for `async` reading and writing parquet //! files to Arrow `RecordBatch`es (requires the `async` feature). -#![allow(incomplete_features)] #![allow(dead_code)] #![allow(non_camel_case_types)] #![allow( - clippy::approx_constant, - clippy::cast_ptr_alignment, - clippy::float_cmp, - clippy::float_equality_without_abs, clippy::from_over_into, - clippy::many_single_char_names, - clippy::needless_range_loop, clippy::new_without_default, clippy::or_fun_call, - clippy::same_item_push, - clippy::too_many_arguments, - clippy::transmute_ptr_to_ptr, - clippy::upper_case_acronyms, - clippy::vec_init_then_push + clippy::too_many_arguments )] /// Defines a module with an experimental public API diff --git a/parquet/src/record/api.rs b/parquet/src/record/api.rs index f5b30753dfe..95b97bc9546 100644 --- a/parquet/src/record/api.rs +++ b/parquet/src/record/api.rs @@ -1376,8 +1376,8 @@ mod tests { assert_eq!(4, row.get_ushort(7).unwrap()); assert_eq!(5, row.get_uint(8).unwrap()); assert_eq!(6, row.get_ulong(9).unwrap()); - assert!(7.1 - row.get_float(10).unwrap() < f32::EPSILON); - assert!(8.1 - row.get_double(11).unwrap() < f64::EPSILON); + assert!((7.1 - row.get_float(10).unwrap()).abs() < f32::EPSILON); + assert!((8.1 - row.get_double(11).unwrap()).abs() < f64::EPSILON); assert_eq!("abc", row.get_string(12).unwrap()); assert_eq!(5, row.get_bytes(13).unwrap().len()); assert_eq!(7, row.get_decimal(14).unwrap().precision()); @@ -1524,10 +1524,10 @@ mod tests { Field::Float(9.2), Field::Float(10.3), ]); - assert!(10.3 - list.get_float(2).unwrap() < f32::EPSILON); + assert!((10.3 - list.get_float(2).unwrap()).abs() < f32::EPSILON); let list = make_list(vec![Field::Double(3.1415)]); - assert!(3.1415 - list.get_double(0).unwrap() < f64::EPSILON); + assert!((3.1415 - list.get_double(0).unwrap()).abs() < f64::EPSILON); let list = make_list(vec![Field::Str("abc".to_string())]); assert_eq!(&"abc".to_string(), list.get_string(0).unwrap()); diff --git a/parquet/src/record/reader.rs b/parquet/src/record/reader.rs index 475da44b3ac..05b63661f09 100644 --- a/parquet/src/record/reader.rs +++ b/parquet/src/record/reader.rs @@ -828,55 +828,25 @@ mod tests { // Convenient macros to assemble row, list, map, and group. macro_rules! row { - () => { + ($($e:tt)*) => { { - let result = Vec::new(); - make_row(result) - } - }; - ( $( $e:expr ), + ) => { - { - let mut result = Vec::new(); - $( - result.push($e); - )* - make_row(result) + make_row(vec![$($e)*]) } } } macro_rules! list { - () => { + ($($e:tt)*) => { { - let result = Vec::new(); - Field::ListInternal(make_list(result)) - } - }; - ( $( $e:expr ), + ) => { - { - let mut result = Vec::new(); - $( - result.push($e); - )* - Field::ListInternal(make_list(result)) + Field::ListInternal(make_list(vec![$($e)*])) } } } macro_rules! map { - () => { - { - let result = Vec::new(); - Field::MapInternal(make_map(result)) - } - }; - ( $( $e:expr ), + ) => { + ($($e:tt)*) => { { - let mut result = Vec::new(); - $( - result.push($e); - )* - Field::MapInternal(make_map(result)) + Field::MapInternal(make_map(vec![$($e)*])) } } } diff --git a/parquet/src/schema/printer.rs b/parquet/src/schema/printer.rs index b78bce7a988..26d8b1b65ea 100644 --- a/parquet/src/schema/printer.rs +++ b/parquet/src/schema/printer.rs @@ -684,19 +684,20 @@ mod tests { .with_length(12) .with_id(2) .build(); - let mut struct_fields = Vec::new(); - struct_fields.push(Arc::new(f1.unwrap())); - struct_fields.push(Arc::new(f2.unwrap())); - struct_fields.push(Arc::new(f3.unwrap())); + + let mut struct_fields = vec![ + Arc::new(f1.unwrap()), + Arc::new(f2.unwrap()), + Arc::new(f3.unwrap()), + ]; let field = Type::group_type_builder("field") .with_repetition(Repetition::OPTIONAL) .with_fields(&mut struct_fields) .with_id(1) .build() .unwrap(); - let mut fields = Vec::new(); - fields.push(Arc::new(field)); - fields.push(Arc::new(f4.unwrap())); + + let mut fields = vec![Arc::new(field), Arc::new(f4.unwrap())]; let message = Type::group_type_builder("schema") .with_fields(&mut fields) .with_id(2) diff --git a/parquet/src/schema/types.rs b/parquet/src/schema/types.rs index d885ff1eb7f..8ae3c4c6e69 100644 --- a/parquet/src/schema/types.rs +++ b/parquet/src/schema/types.rs @@ -1492,9 +1492,7 @@ mod tests { .build(); assert!(f2.is_ok()); - let mut fields = vec![]; - fields.push(Arc::new(f1.unwrap())); - fields.push(Arc::new(f2.unwrap())); + let mut fields = vec![Arc::new(f1.unwrap()), Arc::new(f2.unwrap())]; let result = Type::group_type_builder("foo") .with_repetition(Repetition::REPEATED)