Skip to content

Commit

Permalink
Revert "use different buffer for different encoding"
Browse files Browse the repository at this point in the history
This reverts commit 71f51aa.
  • Loading branch information
ariesdevil committed Apr 14, 2024
1 parent 71f51aa commit fef3c9f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 69 deletions.
11 changes: 6 additions & 5 deletions parquet/src/arrow/array_reader/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ impl ByteViewArrayDecoderPlain {
let mut read = 0;

let buf = self.buf.as_ref();
output.add_buffer(self.buf.clone());
while self.offset < self.buf.len() && read != to_read {
if self.offset + 4 > buf.len() {
return Err(ParquetError::EOF("eof decoding byte view array".into()));
Expand All @@ -341,7 +340,7 @@ impl ByteViewArrayDecoderPlain {
return Err(ParquetError::EOF("eof decoding byte view array".into()));
}

output.try_push_with_offset(start_offset, end_offset)?;
output.try_push(&buf[start_offset..end_offset], self.validate_utf8)?;

self.offset = end_offset;
read += 1;
Expand Down Expand Up @@ -417,10 +416,12 @@ impl ByteViewArrayDecoderDeltaLength {
}

let mut start_offset = self.data_offset;
output.add_buffer(self.data.clone());
for length in src_lengths {
let end_offset = start_offset + *length as usize;
output.try_push_with_offset(start_offset, end_offset)?;
output.try_push(
&self.data.as_ref()[start_offset..end_offset],
self.validate_utf8,
)?;
start_offset = end_offset;
}

Expand Down Expand Up @@ -498,7 +499,7 @@ impl ByteViewArrayDecoderDictionary {
}

self.decoder.read(len, |keys| {
output.extend_from_dictionary(keys, &dict.views, dict.plain_buffer.as_ref().unwrap())
output.extend_from_dictionary(keys, &dict.views, &dict.buffer)
})
}

Expand Down
75 changes: 11 additions & 64 deletions parquet/src/arrow/buffer/view_buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,13 @@ use arrow_array::{make_array, ArrayRef};
use arrow_buffer::{ArrowNativeType, Buffer, ToByteSlice};
use arrow_data::{ArrayDataBuilder, ByteView};
use arrow_schema::DataType as ArrowType;
use bytes::Bytes;

/// A buffer of variable-sized byte arrays that can be converted into
/// a corresponding [`ArrayRef`]
#[derive(Debug, Default)]
pub struct ViewBuffer {
pub views: Vec<u128>,
/// If encoding in (`PLAIN`, `DELTA_LENGTH_BYTE_ARRAY`), we use `plain_buffer`
/// to hold the page data without copy.
pub plain_buffer: Option<Bytes>,
/// If encoding is `DELTA_BYTE_ARRAY`, we use `delta_buffer` to build data buffer
/// since this encoding's page data not hold full data.
///
/// If encoding in (`PLAIN_DICTIONARY`, `RLE_DICTIONARY`), we need these two buffers
/// cause these encoding first build dict then use dict to read data.
pub delta_buffer: Vec<u8>,
pub buffer: Vec<u8>,
}

impl ViewBuffer {
Expand All @@ -50,36 +41,6 @@ impl ViewBuffer {
self.len() == 0
}

/// add entire page buf to [`ViewBuffer`], avoid copy data.
pub fn add_buffer(&mut self, buf: Bytes) {
if self.plain_buffer.is_none() {
self.plain_buffer = Some(buf);
}
}

/// Push data to [`ViewBuffer`], since we already hold full data through [`Self::add_buffer`],
/// we only need to slice the data to build the view.
pub fn try_push_with_offset(&mut self, start_offset: usize, end_offset: usize) -> Result<()> {
let data = &self.plain_buffer.as_ref().unwrap()[start_offset..end_offset];
let length: u32 = (end_offset - start_offset) as u32;
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + length as usize].copy_from_slice(data);
self.views.push(u128::from_le_bytes(view_buffer));
return Ok(());
}

let view = ByteView {
length,
prefix: u32::from_le_bytes(data[0..4].try_into().unwrap()),
buffer_index: 0,
offset: start_offset as u32,
};
self.views.push(view.into());
Ok(())
}

/// If `validate_utf8` this verifies that the first character of `data` is
/// the start of a UTF-8 codepoint
///
Expand Down Expand Up @@ -107,8 +68,8 @@ impl ViewBuffer {
return Ok(());
}

let offset = self.delta_buffer.len() as u32;
self.delta_buffer.extend_from_slice(data);
let offset = self.buffer.len() as u32;
self.buffer.extend_from_slice(data);

let view = ByteView {
length,
Expand Down Expand Up @@ -168,34 +129,20 @@ impl ViewBuffer {
return Ok(());
}
let first_buffer_offset = ((*first_buffer.unwrap()) >> 96) as u32 as usize;
if self.plain_buffer.is_none() {
match std::str::from_utf8(&self.delta_buffer[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
} else {
match std::str::from_utf8(&self.plain_buffer.as_ref().unwrap()[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
match std::str::from_utf8(&self.buffer[first_buffer_offset..]) {
Ok(_) => Ok(()),
Err(e) => Err(general_err!("encountered non UTF-8 data: {}", e)),
}
}

/// Converts this into an [`ArrayRef`] with the provided `data_type` and `null_buffer`
pub fn into_array(self, null_buffer: Option<Buffer>, data_type: ArrowType) -> ArrayRef {
let len = self.len();
let array_data_builder = {
let builder = ArrayDataBuilder::new(data_type)
.len(len)
.add_buffer(Buffer::from_vec(self.views))
.null_bit_buffer(null_buffer);

if self.plain_buffer.is_none() {
builder.add_buffer(Buffer::from_vec(self.delta_buffer))
} else {
builder.add_buffer(self.plain_buffer.unwrap().into())
}
};
let array_data_builder = ArrayDataBuilder::new(data_type)
.len(len)
.add_buffer(Buffer::from_vec(self.views))
.add_buffer(Buffer::from_vec(self.buffer))
.null_bit_buffer(null_buffer);

let data = match cfg!(debug_assertions) {
true => array_data_builder.build().unwrap(),
Expand Down

0 comments on commit fef3c9f

Please sign in to comment.