Skip to content

Commit

Permalink
use different buffer for different encoding
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Apr 14, 2024
1 parent 288fedf commit 71f51aa
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 17 deletions.
11 changes: 5 additions & 6 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ impl ByteViewArrayDecoderPlain {
let mut read = 0;

let buf = self.buf.as_ref();
output.add_buffer(self.buf.clone());
while self.offset < self.buf.len() && read != to_read {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte view array".into()));
Expand All @@ -340,7 +341,7 @@ impl ByteViewArrayDecoderPlain {
return Err(ParquetError::EOF("eof decoding byte view array".into()));
}

output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;
output.try_push_with_offset(start_offset, end_offset)?;

self.offset = end_offset;
read += 1;
Expand Down Expand Up @@ -416,12 +417,10 @@ impl ByteViewArrayDecoderDeltaLength {
}

let mut start_offset = self.data_offset;
output.add_buffer(self.data.clone());
for length in src_lengths {
let end_offset = start_offset + *length as usize;
output.try_push(
&self.data.as_ref()[start_offset..end_offset],
self.validate_utf8,
)?;
output.try_push_with_offset(start_offset, end_offset)?;
start_offset = end_offset;
}

Expand Down Expand Up @@ -499,7 +498,7 @@ impl ByteViewArrayDecoderDictionary {
}

self.decoder.read(len, |keys| {
output.extend_from_dictionary(keys, &dict.views, &dict.buffer)
output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap())
})
}

Expand Down
75 changes: 64 additions & 11 deletions parquet/src/arrow/buffer/view_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@ use arrow_array::{make_array, ArrayRef};
use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
use arrow_data::{ArrayDataBuilder, ByteView};
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;

/// A buffer of variable-sized byte arrays that can be converted into
/// a corresponding [`ArrayRef`]
#[derive(Debug, Default)]
pub struct ViewBuffer {
pub views: Vec<u128>,
pub buffer: Vec<u8>,
/// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer`
/// to hold the page data without copy.
pub plain_buffer: Option<Bytes>,
/// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer
/// since this encoding's page data not hold full data.
///
/// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers
/// cause these encoding first build dict then use dict to read data.
pub delta_buffer: Vec<u8>,
}

impl ViewBuffer {
Expand All @@ -41,6 +50,36 @@ impl ViewBuffer {
self.len() == 0
}

/// add entire page buf to [`ViewBuffer`], avoid copy data.
pub fn add_buffer(&mut self, buf: Bytes) {
if self.plain_buffer.is_none() {
self.plain_buffer = Some(buf);
}
}

/// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`],
/// we only need to slice the data to build the view.
pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> {
let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset];
let length: u32 = (end_offset - start_offset) as u32;
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + length as usize].copy_from_slice(data);
self.views.push(u128::from_le_bytes(view_buffer));
return Ok(());
}

let view = ByteView {
length,
prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
buffer_index: 0,
offset: start_offset as u32,
};
self.views.push(view.into());
Ok(())
}

/// If `validate_utf8` this verifies that the first character of `data` is
/// the start of a UTF-8 codepoint
///
Expand Down Expand Up @@ -68,8 +107,8 @@ impl ViewBuffer {
return Ok(());
}

let offset = self.buffer.len() as u32;
self.buffer.extend_from_slice(data);
let offset = self.delta_buffer.len() as u32;
self.delta_buffer.extend_from_slice(data);

let view = ByteView {
length,
Expand Down Expand Up @@ -129,20 +168,34 @@ impl ViewBuffer {
return Ok(());
}
let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize;
match std::str::from_utf8(&self.buffer[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
if self.plain_buffer.is_none() {
match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
} else {
match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
}
}

/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
let len = self.len();
let array_data_builder = ArrayDataBuilder::new(data_type)
.len(len)
.add_buffer(Buffer::from_vec(self.views))
.add_buffer(Buffer::from_vec(self.buffer))
.null_bit_buffer(null_buffer);
let array_data_builder = {
let builder = ArrayDataBuilder::new(data_type)
.len(len)
.add_buffer(Buffer::from_vec(self.views))
.null_bit_buffer(null_buffer);

if self.plain_buffer.is_none() {
builder.add_buffer(Buffer::from_vec(self.delta_buffer))
} else {
builder.add_buffer(self.plain_buffer.unwrap().into())
}
};

let data = match cfg!(debug_assertions) {
true => array_data_builder.build().unwrap(),
Expand Down

0 comments on commit 71f51aa

Please sign in to comment.