Skip to content

Commit

Permalink
support slice view data on writing ipc
Browse files Browse the repository at this point in the history
  • Loading branch information
ariesdevil committed Apr 9, 2024
1 parent 51c1b4b commit 3a39853
Showing 1 changed file with 65 additions and 8 deletions.
73 changes: 65 additions & 8 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,12 @@ fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
DataType::BinaryView | DataType::Utf8View => {
// The spec documents the counts only includes the variadic buffers, not the view/null buffers.
// https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
counts.push(array.buffers().len() as i64 - 1);
let views = array.buffers()[0].typed_data::<u128>();
if views.iter().any(|view| *view as u32 > 12) {
counts.push(array.buffers().len() as i64 - 1);
} else {
counts.push(0);
}
}
DataType::Dictionary(_, _) => {
// Do nothing
Expand Down Expand Up @@ -1245,6 +1250,63 @@ fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Arra
(offsets, child_data)
}

fn update_buffer_index(value: &mut u128, new_buffer_index: u32) {
// keep length、prefix and offset,clear old buffer_index, see [`ByteView`] for detail.
let mask = !((0xFFFF_FFFFu128) << 64);
*value &= mask;

// move new buffer index to right position
let new_buffer_index = (new_buffer_index as u128) << 64;

// update value with new buffer index
*value |= new_buffer_index;
}

fn select_data_buffers(mut views_slice: Vec<u128>, data: &ArrayData) -> Vec<Buffer> {
let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12);
// all values shorter than 12 bytes.
if first_buffer.is_none() {
return vec![Buffer::from_vec(views_slice)];
}
let first_buffer_index = ((*first_buffer.unwrap()) >> 64) as u32 as usize;

let last_buffer = views_slice
.iter()
.rfind(|view| (**view) as u32 > 12)
.unwrap();
let last_buffer_index = ((*last_buffer) >> 64) as u32 as usize;

let data_buffers = &data.buffers()[1..];
let sliced_data_buffers = &data_buffers[first_buffer_index..last_buffer_index + 1];

// if first buffer index not 0, we need re-mapping view's buffer index to sliced data buffers
if first_buffer_index != 0 {
views_slice
.iter_mut()
.filter(|view| (**view) as u32 > 12)
.for_each(|view| {
// new buffer index = original buffer index - offset
let new_buffer_index = ((*view >> 64) as u32) - first_buffer_index as u32;
update_buffer_index(view, new_buffer_index);
});
}

let mut buffers = Vec::with_capacity(sliced_data_buffers.len() + 1);
buffers.push(views_slice.iter().copied().collect());
buffers.extend_from_slice(sliced_data_buffers);
buffers
}

fn get_byte_view_buffers(data: &ArrayData) -> Vec<Buffer> {
if data.is_empty() {
return Vec::with_capacity(0);
}

let views_slice = data.buffers()[0].typed_data::<u128>();
let views_slice = &views_slice[data.offset()..data.offset() + data.len()];
select_data_buffers(views_slice.to_vec(), data)
}

/// Write array data to a vector of bytes
#[allow(clippy::too_many_arguments)]
fn write_array_data(
Expand Down Expand Up @@ -1303,13 +1365,8 @@ fn write_array_data(
)?;
}
} else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
// Slicing the views buffer is safe and easy,
// but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
//
// Current implementation just serialize the raw arrays as given and not try to optimize anything.
// If users wants to "compact" the arrays prior to sending them over IPC,
// they should consider the gc API suggested in #5513
for buffer in array_data.buffers() {
let view_buffers = get_byte_view_buffers(array_data);
for buffer in view_buffers {
offset = write_buffer(
buffer.as_slice(),
buffers,
Expand Down

0 comments on commit 3a39853

Please sign in to comment.