diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index ddf69232b84..dc89a9c3c7a 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -152,6 +152,7 @@ enum MaybePacked { } impl MaybePacked { + #[inline] fn packed(&mut self) -> &mut PackedDecoder { match self { Self::Packed(d) => d, @@ -159,6 +160,7 @@ impl MaybePacked { } } + #[inline] fn fallback(&mut self) -> &mut ColumnLevelDecoderImpl { match self { Self::Fallback(d) => d, @@ -252,7 +254,7 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { /// [RLE]: https://github.com/apache/parquet-format/blob/master/Encodings.md#run-length-encoding--bit-packing-hybrid-rle--3 /// [BIT_PACKED]: https://github.com/apache/parquet-format/blob/master/Encodings.md#bit-packed-deprecated-bit_packed--4 struct PackedDecoder { - data: Option, + data: ByteBufferPtr, data_offset: usize, rle_left: usize, rle_value: bool, @@ -261,11 +263,6 @@ struct PackedDecoder { } impl PackedDecoder { - #[inline] - fn data(&self) -> &[u8] { - self.data.as_ref().unwrap().data() - } - fn next_rle_block(&mut self) -> Result<()> { let indicator_value = self.decode_header()?; if indicator_value & 1 == 1 { @@ -274,7 +271,7 @@ impl PackedDecoder { self.packed_offset = 0; } else { self.rle_left = (indicator_value >> 1) as usize; - let byte = *self.data().get(self.data_offset).ok_or_else(|| { + let byte = *self.data.as_ref().get(self.data_offset).ok_or_else(|| { ParquetError::EOF( "unexpected end of file whilst decoding definition levels rle value" .into(), @@ -291,14 +288,17 @@ impl PackedDecoder { fn decode_header(&mut self) -> Result { let mut offset = 0; let mut v: i64 = 0; - let data = self.data(); while offset < 10 { - let byte = *data.get(self.data_offset + offset).ok_or_else(|| { - ParquetError::EOF( - "unexpected end of file whilst decoding definition levels rle header" - .into(), - ) - })?; + let byte = *self + .data + .as_ref() + .get(self.data_offset + offset) + .ok_or_else(|| { + ParquetError::EOF( + "unexpected end of file whilst decoding definition levels rle header" + .into(), + ) + })?; v |= ((byte & 0x7F) as i64) << (offset * 7); offset += 1; @@ -314,7 +314,7 @@ impl PackedDecoder { impl PackedDecoder { fn new() -> Self { Self { - data: None, + data: ByteBufferPtr::new(vec![]), data_offset: 0, rle_left: 0, rle_value: false, @@ -332,7 +332,7 @@ impl PackedDecoder { Encoding::BIT_PACKED => data.len() * 8, _ => unreachable!("invalid level encoding: {}", encoding), }; - self.data = Some(data); + self.data = data; self.data_offset = 0; } @@ -347,14 +347,14 @@ impl PackedDecoder { } else if self.packed_count != self.packed_offset { let to_read = (self.packed_count - self.packed_offset).min(len - read); let offset = self.data_offset * 8 + self.packed_offset; - buffer.append_packed_range(offset..offset + to_read, self.data()); + buffer.append_packed_range(offset..offset + to_read, self.data.as_ref()); self.packed_offset += to_read; read += to_read; if self.packed_offset == self.packed_count { self.data_offset += self.packed_count / 8; } - } else if self.data_offset == self.data().len() { + } else if self.data_offset == self.data.len() { break; } else { self.next_rle_block()?