diff --git a/parquet/src/arrow/array_reader/byte_view_array.rs b/parquet/src/arrow/array_reader/byte_view_array.rs index 1c27a3b58b2..156cb0c5e47 100644 --- a/parquet/src/arrow/array_reader/byte_view_array.rs +++ b/parquet/src/arrow/array_reader/byte_view_array.rs @@ -327,6 +327,7 @@ impl ByteViewArrayDecoderPlain { let mut read = 0; let buf = self.buf.as_ref(); + output.add_buffer(self.buf.clone()); while self.offset < self.buf.len() && read != to_read { if self.offset + 4 > buf.len() { return Err(ParquetError::EOF("eof decoding byte view array".into())); @@ -340,7 +341,7 @@ impl ByteViewArrayDecoderPlain { return Err(ParquetError::EOF("eof decoding byte view array".into())); } - output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?; + output.try_push_with_offset(start_offset, end_offset)?; self.offset = end_offset; read += 1; @@ -416,12 +417,10 @@ impl ByteViewArrayDecoderDeltaLength { } let mut start_offset = self.data_offset; + output.add_buffer(self.data.clone()); for length in src_lengths { let end_offset = start_offset + *length as usize; - output.try_push( - &self.data.as_ref()[start_offset..end_offset], - self.validate_utf8, - )?; + output.try_push_with_offset(start_offset, end_offset)?; start_offset = end_offset; } @@ -499,7 +498,7 @@ impl ByteViewArrayDecoderDictionary { } self.decoder.read(len, |keys| { - output.extend_from_dictionary(keys, &dict.views, &dict.buffer) + output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap()) }) } diff --git a/parquet/src/arrow/buffer/view_buffer.rs b/parquet/src/arrow/buffer/view_buffer.rs index 743ee6cf288..784a67c2e1c 100644 --- a/parquet/src/arrow/buffer/view_buffer.rs +++ b/parquet/src/arrow/buffer/view_buffer.rs @@ -22,13 +22,22 @@ use arrow_array::{make_array, ArrayRef}; use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice}; use arrow_data::{ArrayDataBuilder, ByteView}; use arrow_schema::DataType as ArrowType; +use bytes::Bytes; /// A buffer of variable-sized byte arrays that can be converted into /// a corresponding [`ArrayRef`] #[derive(Debug, Default)] pub struct ViewBuffer { pub views: Vec, - pub buffer: Vec, + /// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer` + /// to hold the page data without copy. + pub plain_buffer: Option, + /// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer + /// since this encoding's page data not hold full data. + /// + /// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers + /// cause these encoding first build dict then use dict to read data. + pub delta_buffer: Vec, } impl ViewBuffer { @@ -41,6 +50,36 @@ impl ViewBuffer { self.len() == 0 } + /// add entire page buf to [`ViewBuffer`], avoid copy data. + pub fn add_buffer(&mut self, buf: Bytes) { + if self.plain_buffer.is_none() { + self.plain_buffer = Some(buf); + } + } + + /// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`], + /// we only need to slice the data to build the view. + pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> { + let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset]; + let length: u32 = (end_offset - start_offset) as u32; + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + length as usize].copy_from_slice(data); + self.views.push(u128::from_le_bytes(view_buffer)); + return Ok(()); + } + + let view = ByteView { + length, + prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()), + buffer_index: 0, + offset: start_offset as u32, + }; + self.views.push(view.into()); + Ok(()) + } + /// If `validate_utf8` this verifies that the first character of `data` is /// the start of a UTF-8 codepoint /// @@ -68,8 +107,8 @@ impl ViewBuffer { return Ok(()); } - let offset = self.buffer.len() as u32; - self.buffer.extend_from_slice(data); + let offset = self.delta_buffer.len() as u32; + self.delta_buffer.extend_from_slice(data); let view = ByteView { length, @@ -129,20 +168,34 @@ impl ViewBuffer { return Ok(()); } let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize; - match std::str::from_utf8(&self.buffer[first_buffer_offset..]) { - Ok(_) => Ok(()), - Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + if self.plain_buffer.is_none() { + match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + } + } else { + match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) { + Ok(_) => Ok(()), + Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)), + } } } /// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer` pub fn into_array(self, null_buffer: Option, data_type: ArrowType) -> ArrayRef { let len = self.len(); - let array_data_builder = ArrayDataBuilder::new(data_type) - .len(len) - .add_buffer(Buffer::from_vec(self.views)) - .add_buffer(Buffer::from_vec(self.buffer)) - .null_bit_buffer(null_buffer); + let array_data_builder = { + let builder = ArrayDataBuilder::new(data_type) + .len(len) + .add_buffer(Buffer::from_vec(self.views)) + .null_bit_buffer(null_buffer); + + if self.plain_buffer.is_none() { + builder.add_buffer(Buffer::from_vec(self.delta_buffer)) + } else { + builder.add_buffer(self.plain_buffer.unwrap().into()) + } + }; let data = match cfg!(debug_assertions) { true => array_data_builder.build().unwrap(),