From 51bbdbf6de5709873dbc881b92d15cd8a6acd296 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 8 Dec 2023 16:50:18 +0000 Subject: [PATCH 1/2] Use Vec in ColumnReader (#5177) --- parquet/src/arrow/array_reader/byte_array.rs | 34 ++-- .../array_reader/byte_array_dictionary.rs | 45 ++--- .../array_reader/fixed_len_byte_array.rs | 81 +++------ parquet/src/arrow/array_reader/mod.rs | 4 +- parquet/src/arrow/array_reader/null_array.rs | 10 +- .../src/arrow/array_reader/primitive_array.rs | 8 +- parquet/src/arrow/buffer/dictionary_buffer.rs | 45 +---- parquet/src/arrow/buffer/offset_buffer.rs | 28 +-- parquet/src/arrow/record_reader/buffer.rs | 67 +------ .../arrow/record_reader/definition_levels.rs | 57 +++--- parquet/src/arrow/record_reader/mod.rs | 72 ++------ parquet/src/column/reader.rs | 170 ++++-------------- parquet/src/column/reader/decoder.rs | 138 ++++++-------- parquet/src/column/writer/mod.rs | 43 ++--- parquet/src/file/serialized_reader.rs | 6 +- parquet/src/file/writer.rs | 4 +- parquet/src/record/triplet.rs | 17 +- 17 files changed, 256 insertions(+), 573 deletions(-) diff --git a/parquet/src/arrow/array_reader/byte_array.rs b/parquet/src/arrow/array_reader/byte_array.rs index debe0d6109eb..19086878c151 100644 --- a/parquet/src/arrow/array_reader/byte_array.rs +++ b/parquet/src/arrow/array_reader/byte_array.rs @@ -31,11 +31,10 @@ use crate::schema::types::ColumnDescPtr; use arrow_array::{ Array, ArrayRef, BinaryArray, Decimal128Array, Decimal256Array, OffsetSizeTrait, }; -use arrow_buffer::{i256, Buffer}; +use arrow_buffer::i256; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use std::any::Any; -use std::ops::Range; use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided byte array column @@ -79,8 +78,8 @@ pub fn make_byte_array_reader( struct ByteArrayReader { data_type: ArrowType, pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, record_reader: GenericRecordReader, ByteArrayColumnValueDecoder>, } @@ -154,11 +153,11 @@ impl ArrayReader for ByteArrayReader { } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.rep_levels_buffer.as_deref() } } @@ -170,7 +169,7 @@ struct ByteArrayColumnValueDecoder { } impl ColumnValueDecoder for ByteArrayColumnValueDecoder { - type Slice = OffsetBuffer; + type Buffer = OffsetBuffer; fn new(desc: &ColumnDescPtr) -> Self { let validate_utf8 = desc.converted_type() == ConvertedType::UTF8; @@ -227,13 +226,13 @@ impl ColumnValueDecoder for ByteArrayColumnValueDecoder { Ok(()) } - fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { let decoder = self .decoder .as_mut() .ok_or_else(|| general_err!("no decoder set"))?; - decoder.read(out, range.end - range.start, self.dict.as_ref()) + decoder.read(out, num_values, self.dict.as_ref()) } fn skip_values(&mut self, num_values: usize) -> Result { @@ -590,6 +589,7 @@ mod tests { use crate::arrow::array_reader::test_util::{byte_array_all_encodings, utf8_column}; use crate::arrow::record_reader::buffer::ValuesBuffer; use arrow_array::{Array, StringArray}; + use arrow_buffer::Buffer; #[test] fn test_byte_array_decoder() { @@ -607,20 +607,20 @@ mod tests { let mut output = OffsetBuffer::::default(); decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1); + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); assert_eq!(output.values.as_slice(), "hello".as_bytes()); assert_eq!(output.offsets.as_slice(), &[0, 5]); - assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1); + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); assert_eq!(output.values.as_slice(), "helloworld".as_bytes()); assert_eq!(output.offsets.as_slice(), &[0, 5, 10]); - assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2); + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); assert_eq!(output.values.as_slice(), "helloworldab".as_bytes()); assert_eq!(output.offsets.as_slice(), &[0, 5, 10, 11, 12]); - assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); let valid = [false, false, true, true, false, true, true, false, false]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); @@ -662,7 +662,7 @@ mod tests { let mut output = OffsetBuffer::::default(); decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - assert_eq!(decoder.read(&mut output, 0..1).unwrap(), 1); + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); assert_eq!(output.values.as_slice(), "hello".as_bytes()); assert_eq!(output.offsets.as_slice(), &[0, 5]); @@ -670,11 +670,11 @@ mod tests { assert_eq!(decoder.skip_values(1).unwrap(), 1); assert_eq!(decoder.skip_values(1).unwrap(), 1); - assert_eq!(decoder.read(&mut output, 1..2).unwrap(), 1); + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); assert_eq!(output.values.as_slice(), "hellob".as_bytes()); assert_eq!(output.offsets.as_slice(), &[0, 5, 6]); - assert_eq!(decoder.read(&mut output, 4..8).unwrap(), 0); + assert_eq!(decoder.read(&mut output, 4).unwrap(), 0); let valid = [false, false, true, true, false, false]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); @@ -705,7 +705,7 @@ mod tests { for (encoding, page) in pages.clone() { let mut output = OffsetBuffer::::default(); decoder.set_data(encoding, page, 4, None).unwrap(); - assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); } // test nulls skip diff --git a/parquet/src/arrow/array_reader/byte_array_dictionary.rs b/parquet/src/arrow/array_reader/byte_array_dictionary.rs index a38122354145..3678f24621f8 100644 --- a/parquet/src/arrow/array_reader/byte_array_dictionary.rs +++ b/parquet/src/arrow/array_reader/byte_array_dictionary.rs @@ -17,18 +17,16 @@ use std::any::Any; use std::marker::PhantomData; -use std::ops::Range; use std::sync::Arc; use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; -use arrow_buffer::{ArrowNativeType, Buffer}; +use arrow_buffer::ArrowNativeType; use arrow_schema::DataType as ArrowType; use bytes::Bytes; use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain}; use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer}; -use crate::arrow::record_reader::buffer::BufferQueue; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{ConvertedType, Encoding}; @@ -124,8 +122,8 @@ pub fn make_byte_array_dictionary_reader( struct ByteArrayDictionaryReader { data_type: ArrowType, pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, record_reader: GenericRecordReader, DictionaryDecoder>, } @@ -183,11 +181,11 @@ where } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.rep_levels_buffer.as_deref() } } @@ -224,7 +222,7 @@ where K: FromBytes + Ord + ArrowNativeType, V: OffsetSizeTrait, { - type Slice = DictionaryBuffer; + type Buffer = DictionaryBuffer; fn new(col: &ColumnDescPtr) -> Self { let validate_utf8 = col.converted_type() == ConvertedType::UTF8; @@ -306,16 +304,16 @@ where Ok(()) } - fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { match self.decoder.as_mut().expect("decoder set") { MaybeDictionaryDecoder::Fallback(decoder) => { - decoder.read(out.spill_values()?, range.end - range.start, None) + decoder.read(out.spill_values()?, num_values, None) } MaybeDictionaryDecoder::Dict { decoder, max_remaining_values, } => { - let len = (range.end - range.start).min(*max_remaining_values); + let len = num_values.min(*max_remaining_values); let dict = self .dict @@ -332,8 +330,12 @@ where Some(keys) => { // Happy path - can just copy keys // Keys will be validated on conversion to arrow - let keys_slice = keys.get_output_slice(len); - let len = decoder.get_batch(keys_slice)?; + + // TODO: Push vec into decoder (#5177) + let start = keys.len(); + keys.resize(start + len, K::default()); + let len = decoder.get_batch(&mut keys[start..])?; + keys.truncate(start + len); *max_remaining_values -= len; Ok(len) } @@ -381,6 +383,7 @@ where mod tests { use arrow::compute::cast; use arrow_array::{Array, StringArray}; + use arrow_buffer::Buffer; use crate::arrow::array_reader::test_util::{ byte_array_all_encodings, encode_dictionary, utf8_column, @@ -416,7 +419,7 @@ mod tests { .unwrap(); let mut output = DictionaryBuffer::::default(); - assert_eq!(decoder.read(&mut output, 0..3).unwrap(), 3); + assert_eq!(decoder.read(&mut output, 3).unwrap(), 3); let mut valid = vec![false, false, true, true, false, true]; let valid_buffer = Buffer::from_iter(valid.iter().cloned()); @@ -424,7 +427,7 @@ mod tests { assert!(matches!(output, DictionaryBuffer::Dict { .. })); - assert_eq!(decoder.read(&mut output, 0..4).unwrap(), 4); + assert_eq!(decoder.read(&mut output, 4).unwrap(), 4); valid.extend_from_slice(&[false, false, true, true, false, true, true, false]); let valid_buffer = Buffer::from_iter(valid.iter().cloned()); @@ -484,17 +487,17 @@ mod tests { let mut output = DictionaryBuffer::::default(); // read two skip one - assert_eq!(decoder.read(&mut output, 0..2).unwrap(), 2); + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); assert_eq!(decoder.skip_values(1).unwrap(), 1); assert!(matches!(output, DictionaryBuffer::Dict { .. })); // read two skip one - assert_eq!(decoder.read(&mut output, 2..4).unwrap(), 2); + assert_eq!(decoder.read(&mut output, 2).unwrap(), 2); assert_eq!(decoder.skip_values(1).unwrap(), 1); // read one and test on skip at the end - assert_eq!(decoder.read(&mut output, 4..5).unwrap(), 1); + assert_eq!(decoder.read(&mut output, 1).unwrap(), 1); assert_eq!(decoder.skip_values(4).unwrap(), 0); let valid = [true, true, true, true, true]; @@ -536,7 +539,7 @@ mod tests { for (encoding, page) in pages { decoder.set_data(encoding, page, 4, Some(4)).unwrap(); - assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 4); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4); } let array = output.into_array(None, &data_type).unwrap(); assert_eq!(array.data_type(), &data_type); @@ -580,7 +583,7 @@ mod tests { for (encoding, page) in pages { decoder.set_data(encoding, page, 4, Some(4)).unwrap(); decoder.skip_values(2).expect("skipping two values"); - assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 2); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2); } let array = output.into_array(None, &data_type).unwrap(); assert_eq!(array.data_type(), &data_type); @@ -641,7 +644,7 @@ mod tests { for (encoding, page) in pages.clone() { let mut output = DictionaryBuffer::::default(); decoder.set_data(encoding, page, 8, None).unwrap(); - assert_eq!(decoder.read(&mut output, 0..1024).unwrap(), 0); + assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0); output.pad_nulls(0, 0, 8, &[0]); let array = output diff --git a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs index 849aa37c561f..a0d25d403c1b 100644 --- a/parquet/src/arrow/array_reader/fixed_len_byte_array.rs +++ b/parquet/src/arrow/array_reader/fixed_len_byte_array.rs @@ -18,12 +18,12 @@ use crate::arrow::array_reader::{read_records, skip_records, ArrayReader}; use crate::arrow::buffer::bit_util::{iter_set_bits_rev, sign_extend_be}; use crate::arrow::decoder::{DeltaByteArrayDecoder, DictIndexDecoder}; -use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; +use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::arrow::record_reader::GenericRecordReader; use crate::arrow::schema::parquet_to_arrow_field; use crate::basic::{Encoding, Type}; use crate::column::page::PageIterator; -use crate::column::reader::decoder::{ColumnValueDecoder, ValuesBufferSlice}; +use crate::column::reader::decoder::ColumnValueDecoder; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use arrow_array::{ @@ -36,7 +36,6 @@ use arrow_schema::{DataType as ArrowType, IntervalUnit}; use bytes::Bytes; use half::f16; use std::any::Any; -use std::ops::Range; use std::sync::Arc; /// Returns an [`ArrayReader`] that decodes the provided fixed length byte array column @@ -117,8 +116,8 @@ struct FixedLenByteArrayReader { data_type: ArrowType, byte_length: usize, pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, record_reader: GenericRecordReader, } @@ -135,13 +134,7 @@ impl FixedLenByteArrayReader { pages, def_levels_buffer: None, rep_levels_buffer: None, - record_reader: GenericRecordReader::new_with_records( - column_desc, - FixedLenByteArrayBuffer { - buffer: Default::default(), - byte_length, - }, - ), + record_reader: GenericRecordReader::new(column_desc), } } } @@ -164,7 +157,7 @@ impl ArrayReader for FixedLenByteArrayReader { let array_data = ArrayDataBuilder::new(ArrowType::FixedSizeBinary(self.byte_length as i32)) .len(self.record_reader.num_values()) - .add_buffer(record_data) + .add_buffer(Buffer::from_vec(record_data.buffer)) .null_bit_buffer(self.record_reader.consume_bitmap_buffer()); let binary = FixedSizeBinaryArray::from(unsafe { array_data.build_unchecked() }); @@ -231,41 +224,19 @@ impl ArrayReader for FixedLenByteArrayReader { } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.rep_levels_buffer.as_deref() } } +#[derive(Default)] struct FixedLenByteArrayBuffer { buffer: Vec, /// The length of each element in bytes - byte_length: usize, -} - -impl ValuesBufferSlice for FixedLenByteArrayBuffer { - fn capacity(&self) -> usize { - usize::MAX - } -} - -impl BufferQueue for FixedLenByteArrayBuffer { - type Output = Buffer; - type Slice = Self; - - fn consume(&mut self) -> Self::Output { - Buffer::from_vec(self.buffer.consume()) - } - - fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { - self - } - - fn truncate_buffer(&mut self, len: usize) { - assert_eq!(self.buffer.len(), len * self.byte_length); - } + byte_length: Option, } impl ValuesBuffer for FixedLenByteArrayBuffer { @@ -276,12 +247,11 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { levels_read: usize, valid_mask: &[u8], ) { - assert_eq!( - self.buffer.len(), - (read_offset + values_read) * self.byte_length - ); + let byte_length = self.byte_length.unwrap_or_default(); + + assert_eq!(self.buffer.len(), (read_offset + values_read) * byte_length); self.buffer - .resize((read_offset + levels_read) * self.byte_length, 0); + .resize((read_offset + levels_read) * byte_length, 0); let values_range = read_offset..read_offset + values_read; for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { @@ -290,10 +260,10 @@ impl ValuesBuffer for FixedLenByteArrayBuffer { break; } - let level_pos_bytes = level_pos * self.byte_length; - let value_pos_bytes = value_pos * self.byte_length; + let level_pos_bytes = level_pos * byte_length; + let value_pos_bytes = value_pos * byte_length; - for i in 0..self.byte_length { + for i in 0..byte_length { self.buffer[level_pos_bytes + i] = self.buffer[value_pos_bytes + i] } } @@ -307,7 +277,7 @@ struct ValueDecoder { } impl ColumnValueDecoder for ValueDecoder { - type Slice = FixedLenByteArrayBuffer; + type Buffer = FixedLenByteArrayBuffer; fn new(col: &ColumnDescPtr) -> Self { Self { @@ -374,13 +344,16 @@ impl ColumnValueDecoder for ValueDecoder { Ok(()) } - fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { - assert_eq!(self.byte_length, out.byte_length); + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { + match out.byte_length { + Some(x) => assert_eq!(x, self.byte_length), + None => out.byte_length = Some(self.byte_length), + } - let len = range.end - range.start; match self.decoder.as_mut().unwrap() { Decoder::Plain { offset, buf } => { - let to_read = (len * self.byte_length).min(buf.len() - *offset) / self.byte_length; + let to_read = + (num_values * self.byte_length).min(buf.len() - *offset) / self.byte_length; let end_offset = *offset + to_read * self.byte_length; out.buffer .extend_from_slice(&buf.as_ref()[*offset..end_offset]); @@ -394,7 +367,7 @@ impl ColumnValueDecoder for ValueDecoder { return Ok(0); } - decoder.read(len, |keys| { + decoder.read(num_values, |keys| { out.buffer.reserve(keys.len() * self.byte_length); for key in keys { let offset = *key as usize * self.byte_length; @@ -405,7 +378,7 @@ impl ColumnValueDecoder for ValueDecoder { }) } Decoder::Delta { decoder } => { - let to_read = len.min(decoder.remaining()); + let to_read = num_values.min(decoder.remaining()); out.buffer.reserve(to_read * self.byte_length); decoder.read(to_read, |slice| { diff --git a/parquet/src/arrow/array_reader/mod.rs b/parquet/src/arrow/array_reader/mod.rs index a4ee5040590e..c4e9fc5fa066 100644 --- a/parquet/src/arrow/array_reader/mod.rs +++ b/parquet/src/arrow/array_reader/mod.rs @@ -129,7 +129,7 @@ fn read_records( ) -> Result where V: ValuesBuffer, - CV: ColumnValueDecoder, + CV: ColumnValueDecoder, { let mut records_read = 0usize; while records_read < batch_size { @@ -163,7 +163,7 @@ fn skip_records( ) -> Result where V: ValuesBuffer, - CV: ColumnValueDecoder, + CV: ColumnValueDecoder, { let mut records_skipped = 0usize; while records_skipped < batch_size { diff --git a/parquet/src/arrow/array_reader/null_array.rs b/parquet/src/arrow/array_reader/null_array.rs index bb32fb307fda..838db854e05f 100644 --- a/parquet/src/arrow/array_reader/null_array.rs +++ b/parquet/src/arrow/array_reader/null_array.rs @@ -22,7 +22,7 @@ use crate::data_type::DataType; use crate::errors::Result; use crate::schema::types::ColumnDescPtr; use arrow_array::ArrayRef; -use arrow_buffer::{ArrowNativeType, Buffer}; +use arrow_buffer::ArrowNativeType; use arrow_schema::DataType as ArrowType; use std::any::Any; use std::sync::Arc; @@ -36,8 +36,8 @@ where { data_type: ArrowType, pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, record_reader: RecordReader, } @@ -99,10 +99,10 @@ where } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.rep_levels_buffer.as_deref() } } diff --git a/parquet/src/arrow/array_reader/primitive_array.rs b/parquet/src/arrow/array_reader/primitive_array.rs index 507b6215cacb..07ecc27d9b0b 100644 --- a/parquet/src/arrow/array_reader/primitive_array.rs +++ b/parquet/src/arrow/array_reader/primitive_array.rs @@ -77,8 +77,8 @@ where { data_type: ArrowType, pages: Box, - def_levels_buffer: Option, - rep_levels_buffer: Option, + def_levels_buffer: Option>, + rep_levels_buffer: Option>, record_reader: RecordReader, } @@ -287,11 +287,11 @@ where } fn get_def_levels(&self) -> Option<&[i16]> { - self.def_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.def_levels_buffer.as_deref() } fn get_rep_levels(&self) -> Option<&[i16]> { - self.rep_levels_buffer.as_ref().map(|buf| buf.typed_data()) + self.rep_levels_buffer.as_deref() } } diff --git a/parquet/src/arrow/buffer/dictionary_buffer.rs b/parquet/src/arrow/buffer/dictionary_buffer.rs index d0f63024edf0..9e5b2293aa01 100644 --- a/parquet/src/arrow/buffer/dictionary_buffer.rs +++ b/parquet/src/arrow/buffer/dictionary_buffer.rs @@ -16,8 +16,7 @@ // under the License. use crate::arrow::buffer::offset_buffer::OffsetBuffer; -use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; -use crate::column::reader::decoder::ValuesBufferSlice; +use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, Array, ArrayRef, OffsetSizeTrait}; use arrow_buffer::{ArrowNativeType, Buffer}; @@ -185,12 +184,6 @@ impl DictionaryBuffer { } } -impl ValuesBufferSlice for DictionaryBuffer { - fn capacity(&self) -> usize { - usize::MAX - } -} - impl ValuesBuffer for DictionaryBuffer { fn pad_nulls( &mut self, @@ -211,34 +204,6 @@ impl ValuesBuffer for DictionaryBuffer BufferQueue for DictionaryBuffer { - type Output = Self; - type Slice = Self; - - fn consume(&mut self) -> Self::Output { - match self { - Self::Dict { keys, values } => Self::Dict { - keys: std::mem::take(keys), - values: values.clone(), - }, - Self::Values { values } => Self::Values { - values: values.consume(), - }, - } - } - - fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { - self - } - - fn truncate_buffer(&mut self, len: usize) { - match self { - Self::Dict { keys, .. } => keys.truncate_buffer(len), - Self::Values { values } => values.truncate_buffer(len), - } - } -} - #[cfg(test)] mod tests { use super::*; @@ -274,7 +239,7 @@ mod tests { buffer.pad_nulls(read_offset, 2, 5, null_buffer.as_slice()); assert_eq!(buffer.len(), 13); - let split = buffer.consume(); + let split = std::mem::take(&mut buffer); let array = split.into_array(Some(null_buffer), &dict_type).unwrap(); assert_eq!(array.data_type(), &dict_type); @@ -309,7 +274,9 @@ mod tests { .unwrap() .extend_from_slice(&[0, 1, 0, 1]); - let array = buffer.consume().into_array(None, &dict_type).unwrap(); + let array = std::mem::take(&mut buffer) + .into_array(None, &dict_type) + .unwrap(); assert_eq!(array.data_type(), &dict_type); let strings = cast(&array, &ArrowType::Utf8).unwrap(); @@ -320,7 +287,7 @@ mod tests { ); // Can recreate with new dictionary as keys empty - assert!(matches!(&buffer, DictionaryBuffer::Dict { .. })); + assert!(matches!(&buffer, DictionaryBuffer::Values { .. })); assert_eq!(buffer.len(), 0); let d3 = Arc::new(StringArray::from(vec!["bongo"])) as ArrayRef; buffer.as_keys(&d3).unwrap().extend_from_slice(&[0, 0]); diff --git a/parquet/src/arrow/buffer/offset_buffer.rs b/parquet/src/arrow/buffer/offset_buffer.rs index 459c94ed2803..ce9eb1142a5b 100644 --- a/parquet/src/arrow/buffer/offset_buffer.rs +++ b/parquet/src/arrow/buffer/offset_buffer.rs @@ -16,8 +16,7 @@ // under the License. use crate::arrow::buffer::bit_util::iter_set_bits_rev; -use crate::arrow::record_reader::buffer::{BufferQueue, ValuesBuffer}; -use crate::column::reader::decoder::ValuesBufferSlice; +use crate::arrow::record_reader::buffer::ValuesBuffer; use crate::errors::{ParquetError, Result}; use arrow_array::{make_array, ArrayRef, OffsetSizeTrait}; use arrow_buffer::{ArrowNativeType, Buffer}; @@ -141,23 +140,6 @@ impl OffsetBuffer { } } -impl BufferQueue for OffsetBuffer { - type Output = Self; - type Slice = Self; - - fn consume(&mut self) -> Self::Output { - std::mem::take(self) - } - - fn get_output_slice(&mut self, _batch_size: usize) -> &mut Self::Slice { - self - } - - fn truncate_buffer(&mut self, len: usize) { - assert_eq!(self.offsets.len(), len + 1); - } -} - impl ValuesBuffer for OffsetBuffer { fn pad_nulls( &mut self, @@ -208,12 +190,6 @@ impl ValuesBuffer for OffsetBuffer { } } -impl ValuesBufferSlice for OffsetBuffer { - fn capacity(&self) -> usize { - usize::MAX - } -} - #[cfg(test)] mod tests { use super::*; @@ -250,7 +226,7 @@ mod tests { for v in ["hello", "world", "cupcakes", "a", "b", "c"] { buffer.try_push(v.as_bytes(), false).unwrap() } - let split = buffer.consume(); + let split = std::mem::take(&mut buffer); let array = split.into_array(None, ArrowType::Utf8); let strings = array.as_any().downcast_ref::().unwrap(); diff --git a/parquet/src/arrow/record_reader/buffer.rs b/parquet/src/arrow/record_reader/buffer.rs index 3914710ff7b9..880407a54745 100644 --- a/parquet/src/arrow/record_reader/buffer.rs +++ b/parquet/src/arrow/record_reader/buffer.rs @@ -17,69 +17,8 @@ use crate::arrow::buffer::bit_util::iter_set_bits_rev; -/// A buffer that supports writing new data to the end, and removing data from the front -/// -/// Used by [RecordReader](`super::RecordReader`) to buffer up values before returning a -/// potentially smaller number of values, corresponding to a whole number of semantic records -pub trait BufferQueue: Sized { - type Output: Sized; - - type Slice: ?Sized; - - /// Consumes the contents of this [`BufferQueue`] - fn consume(&mut self) -> Self::Output; - - /// Returns a [`Self::Slice`] with at least `batch_size` capacity that can be used - /// to append data to the end of this [`BufferQueue`] - /// - /// NB: writes to the returned slice will not update the length of [`BufferQueue`] - /// instead a subsequent call should be made to [`BufferQueue::truncate_buffer`] - fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice; - - /// Sets the length of the [`BufferQueue`]. - /// - /// Intended to be used in combination with [`BufferQueue::get_output_slice`] - /// - /// # Panics - /// - /// Implementations must panic if `len` is beyond the initialized length - /// - /// Implementations may panic if `set_len` is called with less than what has been written - /// - /// This distinction is to allow for implementations that return a default initialized - /// [BufferQueue::Slice`] which doesn't track capacity and length separately - /// - /// For example, [`BufferQueue`] returns a default-initialized `&mut [T]`, and does not - /// track how much of this slice is actually written to by the caller. This is still - /// safe as the slice is default-initialized. - /// - fn truncate_buffer(&mut self, len: usize); -} - -impl BufferQueue for Vec { - type Output = Self; - - type Slice = [T]; - - fn consume(&mut self) -> Self::Output { - std::mem::take(self) - } - - fn get_output_slice(&mut self, batch_size: usize) -> &mut Self::Slice { - let len = self.len(); - self.resize(len + batch_size, T::default()); - &mut self[len..] - } - - fn truncate_buffer(&mut self, len: usize) { - assert!(len <= self.len()); - self.truncate(len) - } -} - -/// A [`BufferQueue`] capable of storing column values -pub trait ValuesBuffer: BufferQueue { - /// +/// A buffer that supports padding with nulls +pub trait ValuesBuffer: Default { /// If a column contains nulls, more level data may be read than value data, as null /// values are not encoded. Therefore, first the levels data is read, the null count /// determined, and then the corresponding number of values read to a [`ValuesBuffer`]. @@ -111,7 +50,7 @@ impl ValuesBuffer for Vec { levels_read: usize, valid_mask: &[u8], ) { - assert!(self.len() >= read_offset + levels_read); + self.resize(read_offset + levels_read, T::default()); let values_range = read_offset..read_offset + values_read; for (value_pos, level_pos) in values_range.rev().zip(iter_set_bits_rev(valid_mask)) { diff --git a/parquet/src/arrow/record_reader/definition_levels.rs b/parquet/src/arrow/record_reader/definition_levels.rs index fa041f5fdb0a..87f531df0990 100644 --- a/parquet/src/arrow/record_reader/definition_levels.rs +++ b/parquet/src/arrow/record_reader/definition_levels.rs @@ -15,8 +15,6 @@ // specific language governing permissions and limitations // under the License. -use std::ops::Range; - use arrow_array::builder::BooleanBufferBuilder; use arrow_buffer::bit_chunk_iterator::UnalignedBitChunk; use arrow_buffer::Buffer; @@ -25,7 +23,7 @@ use bytes::Bytes; use crate::arrow::buffer::bit_util::count_set_bits; use crate::basic::Encoding; use crate::column::reader::decoder::{ - ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, LevelsBufferSlice, + ColumnLevelDecoder, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, }; use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; @@ -85,18 +83,13 @@ impl DefinitionLevelBuffer { } /// Returns the built level data - pub fn consume_levels(&mut self) -> Option { + pub fn consume_levels(&mut self) -> Option> { match &mut self.inner { - BufferInner::Full { levels, .. } => Some(Buffer::from_vec(std::mem::take(levels))), + BufferInner::Full { levels, .. } => Some(std::mem::take(levels)), BufferInner::Mask { .. } => None, } } - pub fn set_len(&mut self, len: usize) { - assert_eq!(self.nulls().len(), len); - self.len = len; - } - /// Returns the built null bitmask pub fn consume_bitmask(&mut self) -> Buffer { self.len = 0; @@ -114,18 +107,6 @@ impl DefinitionLevelBuffer { } } -impl LevelsBufferSlice for DefinitionLevelBuffer { - fn capacity(&self) -> usize { - usize::MAX - } - - fn count_nulls(&self, range: Range, _max_level: i16) -> usize { - let total_count = range.end - range.start; - let range = range.start + self.len..range.end + self.len; - total_count - count_set_bits(self.nulls().as_slice(), range) - } -} - enum MaybePacked { Packed(PackedDecoder), Fallback(DefinitionLevelDecoderImpl), @@ -148,7 +129,7 @@ impl DefinitionLevelBufferDecoder { } impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { - type Slice = DefinitionLevelBuffer; + type Buffer = DefinitionLevelBuffer; fn set_data(&mut self, encoding: Encoding, data: Bytes) { match &mut self.decoder { @@ -159,7 +140,11 @@ impl ColumnLevelDecoder for DefinitionLevelBufferDecoder { } impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { - fn read_def_levels(&mut self, writer: &mut Self::Slice, range: Range) -> Result { + fn read_def_levels( + &mut self, + writer: &mut Self::Buffer, + num_levels: usize, + ) -> Result<(usize, usize)> { match (&mut writer.inner, &mut self.decoder) { ( BufferInner::Full { @@ -170,33 +155,33 @@ impl DefinitionLevelDecoder for DefinitionLevelBufferDecoder { MaybePacked::Fallback(decoder), ) => { assert_eq!(self.max_level, *max_level); - assert_eq!(range.start + writer.len, nulls.len()); - levels.resize(range.end + writer.len, 0); - - let slice = &mut levels[writer.len..]; - let levels_read = decoder.read_def_levels(slice, range.clone())?; + let start = levels.len(); + let (values_read, levels_read) = decoder.read_def_levels(levels, num_levels)?; nulls.reserve(levels_read); - for i in &slice[range.start..range.start + levels_read] { - nulls.append(i == max_level) + for i in &levels[start..] { + nulls.append(i == max_level); } - Ok(levels_read) + Ok((values_read, levels_read)) } (BufferInner::Mask { nulls }, MaybePacked::Packed(decoder)) => { assert_eq!(self.max_level, 1); - assert_eq!(range.start + writer.len, nulls.len()); - decoder.read(nulls, range.end - range.start) + let start = nulls.len(); + let levels_read = decoder.read(nulls, num_levels)?; + + let values_read = count_set_bits(nulls.as_slice(), start..start + levels_read); + Ok((values_read, levels_read)) } _ => unreachable!("inconsistent null mask"), } } - fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) -> Result<(usize, usize)> { + fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> { match &mut self.decoder { - MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels, max_def_level), + MaybePacked::Fallback(decoder) => decoder.skip_def_levels(num_levels), MaybePacked::Packed(decoder) => decoder.skip(num_levels), } } diff --git a/parquet/src/arrow/record_reader/mod.rs b/parquet/src/arrow/record_reader/mod.rs index 49c69c87e302..7456da053b9c 100644 --- a/parquet/src/arrow/record_reader/mod.rs +++ b/parquet/src/arrow/record_reader/mod.rs @@ -18,7 +18,7 @@ use arrow_buffer::Buffer; use crate::arrow::record_reader::{ - buffer::{BufferQueue, ValuesBuffer}, + buffer::ValuesBuffer, definition_levels::{DefinitionLevelBuffer, DefinitionLevelBufferDecoder}, }; use crate::column::reader::decoder::RepetitionLevelDecoderImpl; @@ -62,28 +62,18 @@ pub struct GenericRecordReader { impl GenericRecordReader where - V: ValuesBuffer + Default, - CV: ColumnValueDecoder, + V: ValuesBuffer, + CV: ColumnValueDecoder, { /// Create a new [`GenericRecordReader`] pub fn new(desc: ColumnDescPtr) -> Self { - Self::new_with_records(desc, V::default()) - } -} - -impl GenericRecordReader -where - V: ValuesBuffer, - CV: ColumnValueDecoder, -{ - pub fn new_with_records(desc: ColumnDescPtr, records: V) -> Self { let def_levels = (desc.max_def_level() > 0) .then(|| DefinitionLevelBuffer::new(&desc, packed_null_mask(&desc))); let rep_levels = (desc.max_rep_level() > 0).then(Vec::new); Self { - values: records, + values: V::default(), def_levels, rep_levels, column_reader: None, @@ -166,22 +156,20 @@ where /// The implementation has side effects. It will create a new buffer to hold those /// definition level values that have already been read into memory but not counted /// as record values, e.g. those from `self.num_values` to `self.values_written`. - pub fn consume_def_levels(&mut self) -> Option { + pub fn consume_def_levels(&mut self) -> Option> { self.def_levels.as_mut().and_then(|x| x.consume_levels()) } /// Return repetition level data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_rep_levels(&mut self) -> Option { - self.rep_levels - .as_mut() - .map(|x| Buffer::from_vec(x.consume())) + pub fn consume_rep_levels(&mut self) -> Option> { + self.rep_levels.as_mut().map(std::mem::take) } /// Returns currently stored buffer data. /// The side effect is similar to `consume_def_levels`. - pub fn consume_record_data(&mut self) -> V::Output { - self.values.consume() + pub fn consume_record_data(&mut self) -> V { + std::mem::take(&mut self.values) } /// Returns currently stored null bitmap data. @@ -207,18 +195,13 @@ where /// Try to read one batch of data returning the number of records read fn read_one_batch(&mut self, batch_size: usize) -> Result { - let rep_levels = self - .rep_levels - .as_mut() - .map(|levels| levels.get_output_slice(batch_size)); - let def_levels = self.def_levels.as_mut(); - let values = self.values.get_output_slice(batch_size); - - let (records_read, values_read, levels_read) = self - .column_reader - .as_mut() - .unwrap() - .read_records(batch_size, def_levels, rep_levels, values)?; + let (records_read, values_read, levels_read) = + self.column_reader.as_mut().unwrap().read_records( + batch_size, + self.def_levels.as_mut(), + self.rep_levels.as_mut(), + &mut self.values, + )?; if values_read < levels_read { let def_levels = self.def_levels.as_ref().ok_or_else(|| { @@ -235,13 +218,6 @@ where self.num_records += records_read; self.num_values += levels_read; - self.values.truncate_buffer(self.num_values); - if let Some(ref mut buf) = self.rep_levels { - buf.truncate_buffer(self.num_values) - }; - if let Some(ref mut buf) = self.def_levels { - buf.set_len(self.num_values) - }; Ok(records_read) } } @@ -258,7 +234,6 @@ mod tests { use std::sync::Arc; use arrow::buffer::Buffer; - use arrow_array::builder::Int16BufferBuilder; use crate::basic::Encoding; use crate::data_type::Int32Type; @@ -417,11 +392,8 @@ mod tests { } // Verify result def levels - let mut bb = Int16BufferBuilder::new(7); - bb.append_slice(&[1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]); - let expected_def_levels = bb.finish(); assert_eq!( - Some(expected_def_levels), + Some(vec![1i16, 2i16, 0i16, 2i16, 2i16, 0i16, 2i16]), record_reader.consume_def_levels() ); @@ -526,11 +498,8 @@ mod tests { } // Verify result def levels - let mut bb = Int16BufferBuilder::new(9); - bb.append_slice(&[2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]); - let expected_def_levels = bb.finish(); assert_eq!( - Some(expected_def_levels), + Some(vec![2i16, 0i16, 1i16, 2i16, 2i16, 2i16, 2i16, 2i16, 2i16]), record_reader.consume_def_levels() ); @@ -792,11 +761,8 @@ mod tests { } // Verify result def levels - let mut bb = Int16BufferBuilder::new(7); - bb.append_slice(&[0i16, 2i16, 2i16]); - let expected_def_levels = bb.finish(); assert_eq!( - Some(expected_def_levels), + Some(vec![0i16, 2i16, 2i16]), record_reader.consume_def_levels() ); diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs index 6c712ead625c..4f861917c9c5 100644 --- a/parquet/src/column/reader.rs +++ b/parquet/src/column/reader.rs @@ -23,7 +23,7 @@ use super::page::{Page, PageReader}; use crate::basic::*; use crate::column::reader::decoder::{ ColumnValueDecoder, ColumnValueDecoderImpl, DefinitionLevelDecoder, DefinitionLevelDecoderImpl, - LevelsBufferSlice, RepetitionLevelDecoder, RepetitionLevelDecoderImpl, ValuesBufferSlice, + RepetitionLevelDecoder, RepetitionLevelDecoderImpl, }; use crate::data_type::*; use crate::errors::{ParquetError, Result}; @@ -193,9 +193,9 @@ where pub fn read_batch( &mut self, batch_size: usize, - def_levels: Option<&mut D::Slice>, - rep_levels: Option<&mut R::Slice>, - values: &mut V::Slice, + def_levels: Option<&mut D::Buffer>, + rep_levels: Option<&mut R::Buffer>, + values: &mut V::Buffer, ) -> Result<(usize, usize)> { let (_, values, levels) = self.read_records(batch_size, def_levels, rep_levels, values)?; @@ -219,41 +219,26 @@ where pub fn read_records( &mut self, max_records: usize, - mut def_levels: Option<&mut D::Slice>, - mut rep_levels: Option<&mut R::Slice>, - values: &mut V::Slice, + mut def_levels: Option<&mut D::Buffer>, + mut rep_levels: Option<&mut R::Buffer>, + values: &mut V::Buffer, ) -> Result<(usize, usize, usize)> { - let mut max_levels = values.capacity().min(max_records); - if let Some(ref levels) = def_levels { - max_levels = max_levels.min(levels.capacity()); - } - if let Some(ref levels) = rep_levels { - max_levels = max_levels.min(levels.capacity()) - } - let mut total_records_read = 0; let mut total_levels_read = 0; let mut total_values_read = 0; - while total_records_read < max_records - && total_levels_read < max_levels - && self.has_next()? - { + while total_records_read < max_records && self.has_next()? { let remaining_records = max_records - total_records_read; let remaining_levels = self.num_buffered_values - self.num_decoded_values; - let levels_to_read = remaining_levels.min(max_levels - total_levels_read); - let (records_read, levels_read) = match self.rep_level_decoder.as_mut() { + let (records_read, levels_to_read) = match self.rep_level_decoder.as_mut() { Some(reader) => { let out = rep_levels .as_mut() .ok_or_else(|| general_err!("must specify repetition levels"))?; - let (mut records_read, levels_read) = reader.read_rep_levels( - out, - total_levels_read..total_levels_read + levels_to_read, - remaining_records, - )?; + let (mut records_read, levels_read) = + reader.read_rep_levels(out, remaining_records, remaining_levels)?; if levels_read == remaining_levels && self.has_record_delimiter { // Reached end of page, which implies records_read < remaining_records @@ -264,7 +249,7 @@ where (records_read, levels_read) } None => { - let min = remaining_records.min(levels_to_read); + let min = remaining_records.min(remaining_levels); (min, min) } }; @@ -275,26 +260,18 @@ where .as_mut() .ok_or_else(|| general_err!("must specify definition levels"))?; - let read = reader - .read_def_levels(out, total_levels_read..total_levels_read + levels_read)?; + let (values_read, levels_read) = reader.read_def_levels(out, levels_to_read)?; - if read != levels_read { + if levels_read != levels_to_read { return Err(general_err!("insufficient definition levels read from column - expected {rep_levels}, got {read}")); } - let null_count = out.count_nulls( - total_levels_read..total_levels_read + read, - self.descr.max_def_level(), - ); - levels_read - null_count + values_read } - None => levels_read, + None => levels_to_read, }; - let values_read = self.values_decoder.read( - values, - total_values_read..total_values_read + values_to_read, - )?; + let values_read = self.values_decoder.read(values, values_to_read)?; if values_read != values_to_read { return Err(general_err!( @@ -302,9 +279,9 @@ where )); } - self.num_decoded_values += levels_read; + self.num_decoded_values += levels_to_read; total_records_read += records_read; - total_levels_read += levels_read; + total_levels_read += levels_to_read; total_values_read += values_read; } @@ -389,9 +366,7 @@ where } let (values_read, def_levels_read) = match self.def_level_decoder.as_mut() { - Some(decoder) => { - decoder.skip_def_levels(rep_levels_read, self.descr.max_def_level())? - } + Some(decoder) => decoder.skip_def_levels(rep_levels_read)?, None => (rep_levels_read, rep_levels_read), }; @@ -1016,34 +991,22 @@ mod tests { #[test] fn test_read_batch_values_only() { - test_read_batch_int32(16, &mut [0; 10], None, None); // < batch_size - test_read_batch_int32(16, &mut [0; 16], None, None); // == batch_size - test_read_batch_int32(16, &mut [0; 51], None, None); // > batch_size + test_read_batch_int32(16, 0, 0); } #[test] fn test_read_batch_values_def_levels() { - test_read_batch_int32(16, &mut [0; 10], Some(&mut [0; 10]), None); - test_read_batch_int32(16, &mut [0; 16], Some(&mut [0; 16]), None); - test_read_batch_int32(16, &mut [0; 51], Some(&mut [0; 51]), None); + test_read_batch_int32(16, MAX_DEF_LEVEL, 0); } #[test] fn test_read_batch_values_rep_levels() { - test_read_batch_int32(16, &mut [0; 10], None, Some(&mut [0; 10])); - test_read_batch_int32(16, &mut [0; 16], None, Some(&mut [0; 16])); - test_read_batch_int32(16, &mut [0; 51], None, Some(&mut [0; 51])); - } - - #[test] - fn test_read_batch_different_buf_sizes() { - test_read_batch_int32(16, &mut [0; 8], Some(&mut [0; 9]), Some(&mut [0; 7])); - test_read_batch_int32(16, &mut [0; 1], Some(&mut [0; 9]), Some(&mut [0; 3])); + test_read_batch_int32(16, 0, MAX_REP_LEVEL); } #[test] fn test_read_batch_values_def_rep_levels() { - test_read_batch_int32(128, &mut [0; 128], Some(&mut [0; 128]), Some(&mut [0; 128])); + test_read_batch_int32(128, MAX_DEF_LEVEL, MAX_REP_LEVEL); } #[test] @@ -1065,9 +1028,6 @@ mod tests { let num_pages = 2; let num_levels = 4; let batch_size = 5; - let values = &mut vec![0; 7]; - let def_levels = &mut vec![0; 7]; - let rep_levels = &mut vec![0; 7]; let mut tester = ColumnReaderTester::::new(); tester.test_read_batch( @@ -1078,9 +1038,6 @@ mod tests { batch_size, std::i32::MIN, std::i32::MAX, - values, - Some(def_levels), - Some(rep_levels), false, ); } @@ -1153,24 +1110,8 @@ mod tests { // // This is a high level wrapper on `ColumnReaderTester` that allows us to specify some // boilerplate code for setting up definition/repetition levels and column descriptor. - fn test_read_batch_int32( - batch_size: usize, - values: &mut [i32], - def_levels: Option<&mut [i16]>, - rep_levels: Option<&mut [i16]>, - ) { + fn test_read_batch_int32(batch_size: usize, max_def_level: i16, max_rep_level: i16) { let primitive_type = get_test_int32_type(); - // make field is required based on provided slices of levels - let max_def_level = if def_levels.is_some() { - MAX_DEF_LEVEL - } else { - 0 - }; - let max_rep_level = if rep_levels.is_some() { - MAX_REP_LEVEL - } else { - 0 - }; let desc = Arc::new(ColumnDescriptor::new( Arc::new(primitive_type), @@ -1178,6 +1119,7 @@ mod tests { max_rep_level, ColumnPath::new(Vec::new()), )); + let mut tester = ColumnReaderTester::::new(); tester.test_read_batch( desc, @@ -1187,9 +1129,6 @@ mod tests { batch_size, i32::MIN, i32::MAX, - values, - def_levels, - rep_levels, false, ); } @@ -1317,21 +1256,8 @@ mod tests { max: T::T, use_v2: bool, ) { - let mut def_levels = vec![0; num_levels * num_pages]; - let mut rep_levels = vec![0; num_levels * num_pages]; - let mut values = vec![T::T::default(); num_levels * num_pages]; self.test_read_batch( - desc, - encoding, - num_pages, - num_levels, - batch_size, - min, - max, - &mut values, - Some(&mut def_levels), - Some(&mut rep_levels), - use_v2, + desc, encoding, num_pages, num_levels, batch_size, min, max, use_v2, ); } @@ -1347,9 +1273,6 @@ mod tests { batch_size: usize, min: T::T, max: T::T, - values: &mut [T::T], - mut def_levels: Option<&mut [i16]>, - mut rep_levels: Option<&mut [i16]>, use_v2: bool, ) { let mut pages = VecDeque::new(); @@ -1372,18 +1295,19 @@ mod tests { let column_reader: ColumnReader = get_column_reader(desc, Box::new(page_reader)); let mut typed_column_reader = get_typed_column_reader::(column_reader); + let mut values = Vec::new(); + let mut def_levels = Vec::new(); + let mut rep_levels = Vec::new(); + let mut curr_values_read = 0; let mut curr_levels_read = 0; loop { - let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]); - let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[curr_levels_read..]); - let (_, values_read, levels_read) = typed_column_reader .read_records( batch_size, - actual_def_levels, - actual_rep_levels, - &mut values[curr_values_read..], + Some(&mut def_levels), + Some(&mut rep_levels), + &mut values, ) .expect("read_batch() should be OK"); @@ -1395,38 +1319,18 @@ mod tests { } } - assert!( - values.len() >= curr_values_read, - "values.len() >= values_read" - ); - assert_eq!( - &values[0..curr_values_read], - &self.values[0..curr_values_read], - "values content doesn't match" - ); + assert_eq!(values, self.values, "values content doesn't match"); if max_def_level > 0 { - let levels = def_levels.as_ref().unwrap(); - assert!( - levels.len() >= curr_levels_read, - "def_levels.len() >= levels_read" - ); assert_eq!( - &levels[0..curr_levels_read], - &self.def_levels[0..curr_levels_read], + def_levels, self.def_levels, "definition levels content doesn't match" ); } if max_rep_level > 0 { - let levels = rep_levels.as_ref().unwrap(); - assert!( - levels.len() >= curr_levels_read, - "rep_levels.len() >= levels_read" - ); assert_eq!( - &levels[0..curr_levels_read], - &self.rep_levels[0..curr_levels_read], + rep_levels, self.rep_levels, "repetition levels content doesn't match" ); } diff --git a/parquet/src/column/reader/decoder.rs b/parquet/src/column/reader/decoder.rs index ef62724689a8..9889973b6721 100644 --- a/parquet/src/column/reader/decoder.rs +++ b/parquet/src/column/reader/decoder.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::HashMap; -use std::ops::Range; use bytes::Bytes; @@ -30,52 +29,18 @@ use crate::errors::{ParquetError, Result}; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::{num_required_bits, BitReader}; -/// A slice of levels buffer data that is written to by a [`ColumnLevelDecoder`] -pub trait LevelsBufferSlice { - /// Returns the capacity of this slice or `usize::MAX` if no limit - fn capacity(&self) -> usize; - - /// Count the number of levels in `range` not equal to `max_level` - fn count_nulls(&self, range: Range, max_level: i16) -> usize; -} - -impl LevelsBufferSlice for [i16] { - fn capacity(&self) -> usize { - self.len() - } - - fn count_nulls(&self, range: Range, max_level: i16) -> usize { - self[range].iter().filter(|i| **i != max_level).count() - } -} - -/// A slice of values buffer data that is written to by a [`ColumnValueDecoder`] -pub trait ValuesBufferSlice { - /// Returns the capacity of this slice or `usize::MAX` if no limit - fn capacity(&self) -> usize; -} - -impl ValuesBufferSlice for [T] { - fn capacity(&self) -> usize { - self.len() - } -} - -/// Decodes level data to a [`LevelsBufferSlice`] +/// Decodes level data pub trait ColumnLevelDecoder { - type Slice: LevelsBufferSlice + ?Sized; + type Buffer; /// Set data for this [`ColumnLevelDecoder`] fn set_data(&mut self, encoding: Encoding, data: Bytes); } pub trait RepetitionLevelDecoder: ColumnLevelDecoder { - /// Read up to `max_records` of repetition level data into `out[range]` returning the number + /// Read up to `max_records` of repetition level data into `out` returning the number /// of complete records and levels read /// - /// `range` is provided by the caller to allow for types such as default-initialized `[T]` - /// that only track capacity and not length - /// /// A record only ends when the data contains a subsequent repetition level of 0, /// it is therefore left to the caller to delimit the final record in a column /// @@ -84,9 +49,9 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder { /// Implementations may panic if `range` overlaps with already written data fn read_rep_levels( &mut self, - out: &mut Self::Slice, - range: Range, - max_records: usize, + out: &mut Self::Buffer, + num_records: usize, + num_levels: usize, ) -> Result<(usize, usize)>; /// Skips over up to `num_levels` repetition levels corresponding to `num_records` records, @@ -103,27 +68,28 @@ pub trait RepetitionLevelDecoder: ColumnLevelDecoder { } pub trait DefinitionLevelDecoder: ColumnLevelDecoder { - /// Read definition level data into `out[range]` returning the number of levels read + /// Read up to `num_levels` definition levels into `out` /// - /// `range` is provided by the caller to allow for types such as default-initialized `[T]` - /// that only track capacity and not length + /// Returns the number of values skipped, and the number of levels skipped /// /// # Panics /// /// Implementations may panic if `range` overlaps with already written data - /// - // TODO: Should this return the number of nulls - fn read_def_levels(&mut self, out: &mut Self::Slice, range: Range) -> Result; + fn read_def_levels( + &mut self, + out: &mut Self::Buffer, + num_levels: usize, + ) -> Result<(usize, usize)>; /// Skips over `num_levels` definition levels /// /// Returns the number of values skipped, and the number of levels skipped - fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) -> Result<(usize, usize)>; + fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)>; } -/// Decodes value data to a [`ValuesBufferSlice`] +/// Decodes value data pub trait ColumnValueDecoder { - type Slice: ValuesBufferSlice + ?Sized; + type Buffer; /// Create a new [`ColumnValueDecoder`] fn new(col: &ColumnDescPtr) -> Self; @@ -156,16 +122,13 @@ pub trait ColumnValueDecoder { num_values: Option, ) -> Result<()>; - /// Read values data into `out[range]` returning the number of values read - /// - /// `range` is provided by the caller to allow for types such as default-initialized `[T]` - /// that only track capacity and not length + /// Read up to `num_values` values into `out` /// /// # Panics /// /// Implementations may panic if `range` overlaps with already written data /// - fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result; + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result; /// Skips over `num_values` values /// @@ -184,7 +147,7 @@ pub struct ColumnValueDecoderImpl { } impl ColumnValueDecoder for ColumnValueDecoderImpl { - type Slice = [T::T]; + type Buffer = Vec; fn new(descr: &ColumnDescPtr) -> Self { Self { @@ -258,7 +221,7 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { Ok(()) } - fn read(&mut self, out: &mut Self::Slice, range: Range) -> Result { + fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result { let encoding = self .current_encoding .expect("current_encoding should be set"); @@ -268,7 +231,12 @@ impl ColumnValueDecoder for ColumnValueDecoderImpl { .get_mut(&encoding) .unwrap_or_else(|| panic!("decoder for encoding {encoding} should be set")); - current_decoder.get(&mut out[range]) + // TODO: Push vec into decoder (#5177) + let start = out.len(); + out.resize(start + num_values, T::T::default()); + let read = current_decoder.get(&mut out[start..])?; + out.truncate(start + read); + Ok(read) } fn skip_values(&mut self, num_values: usize) -> Result { @@ -319,6 +287,7 @@ impl LevelDecoder { pub struct DefinitionLevelDecoderImpl { decoder: Option, bit_width: u8, + max_level: i16, } impl DefinitionLevelDecoderImpl { @@ -327,12 +296,13 @@ impl DefinitionLevelDecoderImpl { Self { decoder: None, bit_width, + max_level, } } } impl ColumnLevelDecoder for DefinitionLevelDecoderImpl { - type Slice = [i16]; + type Buffer = Vec; fn set_data(&mut self, encoding: Encoding, data: Bytes) { self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)) @@ -340,11 +310,23 @@ impl ColumnLevelDecoder for DefinitionLevelDecoderImpl { } impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl { - fn read_def_levels(&mut self, out: &mut Self::Slice, range: Range) -> Result { - self.decoder.as_mut().unwrap().read(&mut out[range]) + fn read_def_levels( + &mut self, + out: &mut Self::Buffer, + num_levels: usize, + ) -> Result<(usize, usize)> { + // TODO: Push vec into decoder (#5177) + let start = out.len(); + out.resize(start + num_levels, 0); + let levels_read = self.decoder.as_mut().unwrap().read(&mut out[start..])?; + out.truncate(start + levels_read); + + let iter = out.iter().skip(start); + let values_read = iter.filter(|x| **x == self.max_level).count(); + Ok((values_read, levels_read)) } - fn skip_def_levels(&mut self, num_levels: usize, max_def_level: i16) -> Result<(usize, usize)> { + fn skip_def_levels(&mut self, num_levels: usize) -> Result<(usize, usize)> { let mut level_skip = 0; let mut value_skip = 0; let mut buf: Vec = vec![]; @@ -353,14 +335,14 @@ impl DefinitionLevelDecoder for DefinitionLevelDecoderImpl { let to_read = remaining_levels.min(SKIP_BUFFER_SIZE); buf.resize(to_read, 0); - let read = self.read_def_levels(&mut buf, 0..to_read)?; - if read == 0 { + let (values_read, levels_read) = self.read_def_levels(&mut buf, to_read)?; + if levels_read == 0 { // Reached end of page break; } - level_skip += read; - value_skip += buf[..read].iter().filter(|x| **x == max_def_level).count(); + level_skip += levels_read; + value_skip += values_read; } Ok((value_skip, level_skip)) @@ -423,7 +405,7 @@ impl RepetitionLevelDecoderImpl { } impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { - type Slice = [i16]; + type Buffer = Vec; fn set_data(&mut self, encoding: Encoding, data: Bytes) { self.decoder = Some(LevelDecoder::new(encoding, data, self.bit_width)); @@ -435,16 +417,14 @@ impl ColumnLevelDecoder for RepetitionLevelDecoderImpl { impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl { fn read_rep_levels( &mut self, - out: &mut Self::Slice, - range: Range, - max_records: usize, + out: &mut Self::Buffer, + num_records: usize, + num_levels: usize, ) -> Result<(usize, usize)> { - let output = &mut out[range]; - let max_levels = output.len(); let mut total_records_read = 0; let mut total_levels_read = 0; - while total_records_read < max_records && total_levels_read < max_levels { + while total_records_read < num_records && total_levels_read < num_levels { if self.buffer_len == self.buffer_offset { self.fill_buf()?; if self.buffer_len == 0 { @@ -453,11 +433,11 @@ impl RepetitionLevelDecoder for RepetitionLevelDecoderImpl { } let (partial, records_read, levels_read) = self.count_records( - max_records - total_records_read, - max_levels - total_levels_read, + num_records - total_records_read, + num_levels - total_levels_read, ); - output[total_levels_read..total_levels_read + levels_read].copy_from_slice( + out.extend_from_slice( &self.buffer[self.buffer_offset..self.buffer_offset + levels_read], ); @@ -550,13 +530,13 @@ mod tests { let (records_read, levels_read) = if skip { decoder.skip_rep_levels(records, remaining_levels).unwrap() } else { - let mut decoded = vec![0; remaining_levels]; + let mut decoded = Vec::new(); let (records_read, levels_read) = decoder - .read_rep_levels(&mut decoded, 0..remaining_levels, records) + .read_rep_levels(&mut decoded, records, remaining_levels) .unwrap(); assert_eq!( - decoded[..levels_read], + decoded, encoded[encoded.len() - remaining_levels..][..levels_read] ); (records_read, levels_read) diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 531af4bd461e..e5c4b71d12e0 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -3308,19 +3308,20 @@ mod tests { ) .unwrap(), ); - let reader = get_test_column_reader::(page_reader, max_def_level, max_rep_level); - - let mut actual_values = vec![T::T::default(); max_batch_size]; - let mut actual_def_levels = def_levels.map(|_| vec![0i16; max_batch_size]); - let mut actual_rep_levels = rep_levels.map(|_| vec![0i16; max_batch_size]); - - let (_, values_read, levels_read) = read_fully( - reader, - max_batch_size, - actual_def_levels.as_mut(), - actual_rep_levels.as_mut(), - actual_values.as_mut_slice(), - ); + let mut reader = get_test_column_reader::(page_reader, max_def_level, max_rep_level); + + let mut actual_values = Vec::with_capacity(max_batch_size); + let mut actual_def_levels = def_levels.map(|_| Vec::with_capacity(max_batch_size)); + let mut actual_rep_levels = rep_levels.map(|_| Vec::with_capacity(max_batch_size)); + + let (_, values_read, levels_read) = reader + .read_records( + max_batch_size, + actual_def_levels.as_mut(), + actual_rep_levels.as_mut(), + &mut actual_values, + ) + .unwrap(); // Assert values, definition and repetition levels. @@ -3383,22 +3384,6 @@ mod tests { assert_eq!(meta.encodings(), &encodings); } - /// Reads one batch of data, considering that batch is large enough to capture all of - /// the values and levels. - fn read_fully( - mut reader: ColumnReaderImpl, - batch_size: usize, - mut def_levels: Option<&mut Vec>, - mut rep_levels: Option<&mut Vec>, - values: &mut [T::T], - ) -> (usize, usize, usize) { - let actual_def_levels = def_levels.as_mut().map(|vec| &mut vec[..]); - let actual_rep_levels = rep_levels.as_mut().map(|vec| &mut vec[..]); - reader - .read_records(batch_size, actual_def_levels, actual_rep_levels, values) - .unwrap() - } - /// Returns column writer. fn get_test_column_writer<'a, T: DataType>( page_writer: Box, diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index fbb172d3b3c2..468433f31dd1 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -1739,8 +1739,8 @@ mod tests { let row_group_reader = reader.get_row_group(0).unwrap(); match row_group_reader.get_column_reader(0).unwrap() { ColumnReader::Int64ColumnReader(mut reader) => { - let mut buffer = [0; 1024]; - let mut def_levels = [0; 1024]; + let mut buffer = Vec::with_capacity(1024); + let mut def_levels = Vec::with_capacity(1024); let (num_records, num_values, num_levels) = reader .read_records(1024, Some(&mut def_levels), None, &mut buffer) .unwrap(); @@ -1750,7 +1750,7 @@ mod tests { assert_eq!(num_levels, 513); let expected: Vec = (1..514).collect(); - assert_eq!(&buffer[..513], &expected); + assert_eq!(&buffer, &expected); } _ => unreachable!(), } diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index f0b75f302552..3534bab81691 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -1700,12 +1700,14 @@ mod tests { let test_read = |reader: SerializedFileReader| { let row_group = reader.get_row_group(0).unwrap(); - let mut out = [0; 4]; + let mut out = Vec::with_capacity(4); let c1 = row_group.get_column_reader(0).unwrap(); let mut c1 = get_typed_column_reader::(c1); c1.read_records(4, None, None, &mut out).unwrap(); assert_eq!(out, column_data[0]); + out.clear(); + let c2 = row_group.get_column_reader(1).unwrap(); let mut c2 = get_typed_column_reader::(c2); c2.read_records(4, None, None, &mut out).unwrap(); diff --git a/parquet/src/record/triplet.rs b/parquet/src/record/triplet.rs index 7647b23e28d7..902641c08bcf 100644 --- a/parquet/src/record/triplet.rs +++ b/parquet/src/record/triplet.rs @@ -296,17 +296,19 @@ impl TypedTripletIter { // and therefore not advance `self.triplets_left` while self.curr_triplet_index >= self.triplets_left { let (records_read, values_read, levels_read) = { - // Get slice of definition levels, if available - let def_levels = self.def_levels.as_mut().map(|vec| &mut vec[..]); - - // Get slice of repetition levels, if available - let rep_levels = self.rep_levels.as_mut().map(|vec| &mut vec[..]); + self.values.clear(); + if let Some(x) = &mut self.def_levels { + x.clear() + } + if let Some(x) = &mut self.rep_levels { + x.clear() + } // Buffer triplets self.reader.read_records( self.batch_size, - def_levels, - rep_levels, + self.def_levels.as_mut(), + self.rep_levels.as_mut(), &mut self.values, )? }; @@ -333,6 +335,7 @@ impl TypedTripletIter { // Note: if values_read == 0, then spacing will not be triggered let mut idx = values_read; let def_levels = self.def_levels.as_ref().unwrap(); + self.values.resize(levels_read, T::T::default()); for i in 0..levels_read { if def_levels[levels_read - i - 1] == self.max_def_level { idx -= 1; // This is done to avoid usize becoming a negative value From 8f23a32cb3d9282ea8982a6a15ad876e52e3df88 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Fri, 8 Dec 2023 18:28:26 +0000 Subject: [PATCH 2/2] Update parquet_derive --- parquet_derive/src/parquet_field.rs | 48 ++++++----------------------- 1 file changed, 10 insertions(+), 38 deletions(-) diff --git a/parquet_derive/src/parquet_field.rs b/parquet_derive/src/parquet_field.rs index bb33b3196855..8d759d11c4bc 100644 --- a/parquet_derive/src/parquet_field.rs +++ b/parquet_derive/src/parquet_field.rs @@ -243,15 +243,12 @@ impl Field { pub fn reader_snippet(&self) -> proc_macro2::TokenStream { let ident = &self.ident; let column_reader = self.ty.column_reader(); - let parquet_type = self.ty.physical_type_as_rust(); // generate the code to read the column into a vector `vals` let write_batch_expr = quote! { - let mut vals_vec = Vec::new(); - vals_vec.resize(num_records, Default::default()); - let mut vals: &mut [#parquet_type] = vals_vec.as_mut_slice(); + let mut vals = Vec::new(); if let #column_reader(mut typed) = column_reader { - typed.read_records(num_records, None, None, vals)?; + typed.read_records(num_records, None, None, &mut vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{#ident}); } @@ -646,23 +643,6 @@ impl Type { } } - fn physical_type_as_rust(&self) -> proc_macro2::TokenStream { - use parquet::basic::Type as BasicType; - - match self.physical_type() { - BasicType::BOOLEAN => quote! { bool }, - BasicType::INT32 => quote! { i32 }, - BasicType::INT64 => quote! { i64 }, - BasicType::INT96 => unimplemented!("96-bit int currently is not supported"), - BasicType::FLOAT => quote! { f32 }, - BasicType::DOUBLE => quote! { f64 }, - BasicType::BYTE_ARRAY => quote! { ::parquet::data_type::ByteArray }, - BasicType::FIXED_LEN_BYTE_ARRAY => { - quote! { ::parquet::data_type::FixedLenByteArray } - } - } - } - fn logical_type(&self) -> proc_macro2::TokenStream { let last_part = self.last_part(); let leaf_type = self.leaf_type_recursive(); @@ -877,11 +857,9 @@ mod test { snippet, (quote! { { - let mut vals_vec = Vec::new(); - vals_vec.resize(num_records, Default::default()); - let mut vals: &mut[i64] = vals_vec.as_mut_slice(); + let mut vals = Vec::new(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, vals)?; + typed.read_records(num_records, None, None, &mut vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ counter }); } @@ -1256,11 +1234,9 @@ mod test { let when = Field::from(&fields[0]); assert_eq!(when.reader_snippet().to_string(),(quote!{ { - let mut vals_vec = Vec::new(); - vals_vec.resize(num_records, Default::default()); - let mut vals: &mut[i64] = vals_vec.as_mut_slice(); + let mut vals = Vec::new(); if let ColumnReader::Int64ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, vals)?; + typed.read_records(num_records, None, None, &mut vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); } @@ -1326,11 +1302,9 @@ mod test { let when = Field::from(&fields[0]); assert_eq!(when.reader_snippet().to_string(),(quote!{ { - let mut vals_vec = Vec::new(); - vals_vec.resize(num_records, Default::default()); - let mut vals: &mut [i32] = vals_vec.as_mut_slice(); + let mut vals = Vec::new(); if let ColumnReader::Int32ColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, vals)?; + typed.read_records(num_records, None, None, &mut vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ henceforth }); } @@ -1396,11 +1370,9 @@ mod test { let when = Field::from(&fields[0]); assert_eq!(when.reader_snippet().to_string(),(quote!{ { - let mut vals_vec = Vec::new(); - vals_vec.resize(num_records, Default::default()); - let mut vals: &mut [::parquet::data_type::ByteArray] = vals_vec.as_mut_slice(); + let mut vals = Vec::new(); if let ColumnReader::ByteArrayColumnReader(mut typed) = column_reader { - typed.read_records(num_records, None, None, vals)?; + typed.read_records(num_records, None, None, &mut vals)?; } else { panic!("Schema and struct disagree on type for {}", stringify!{ unique_id }); }