Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support slice view data on writing IPC #5610

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 65 additions & 8 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,12 @@ fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
DataType::BinaryView | DataType::Utf8View => {
// The spec documents the counts only includes the variadic buffers, not the view/null buffers.
// https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
counts.push(array.buffers().len() as i64 - 1);
let views = array.buffers()[0].typed_data::<u128>();
if views.iter().any(|view| *view as u32 > 12) {
counts.push(array.buffers().len() as i64 - 1);
} else {
counts.push(0);
}
}
DataType::Dictionary(_, _) => {
// Do nothing
Expand Down Expand Up @@ -1245,6 +1250,63 @@ fn get_list_array_buffers<O: OffsetSizeTrait>(data: &ArrayData) -> (Buffer, Arra
(offsets, child_data)
}

fn update_buffer_index(value: &mut u128, new_buffer_index: u32) {
// keep length、prefix and offset,clear old buffer_index, see [`ByteView`] for detail.
let mask = !((0xFFFF_FFFFu128) << 64);
*value &= mask;

// move new buffer index to right position
let new_buffer_index = (new_buffer_index as u128) << 64;

// update value with new buffer index
*value |= new_buffer_index;
}

fn select_data_buffers(mut views_slice: Vec<u128>, data: &ArrayData) -> Vec<Buffer> {
let first_buffer = views_slice.iter().find(|view| (**view) as u32 > 12);
// all values shorter than 12 bytes.
if first_buffer.is_none() {
return vec![Buffer::from_vec(views_slice)];
}
let first_buffer_index = ((*first_buffer.unwrap()) >> 64) as u32 as usize;

let last_buffer = views_slice
.iter()
.rfind(|view| (**view) as u32 > 12)
.unwrap();
let last_buffer_index = ((*last_buffer) >> 64) as u32 as usize;

let data_buffers = &data.buffers()[1..];
let sliced_data_buffers = &data_buffers[first_buffer_index..last_buffer_index + 1];

// if first buffer index not 0, we need re-mapping view's buffer index to sliced data buffers
if first_buffer_index != 0 {
views_slice
.iter_mut()
.filter(|view| (**view) as u32 > 12)
.for_each(|view| {
// new buffer index = original buffer index - offset
let new_buffer_index = ((*view >> 64) as u32) - first_buffer_index as u32;
update_buffer_index(view, new_buffer_index);
});
}

let mut buffers = Vec::with_capacity(sliced_data_buffers.len() + 1);
buffers.push(views_slice.iter().copied().collect());
buffers.extend_from_slice(sliced_data_buffers);
buffers
}

fn get_byte_view_buffers(data: &ArrayData) -> Vec<Buffer> {
if data.is_empty() {
return Vec::with_capacity(0);
}

let views_slice = data.buffers()[0].typed_data::<u128>();
let views_slice = &views_slice[data.offset()..data.offset() + data.len()];
select_data_buffers(views_slice.to_vec(), data)
}

/// Write array data to a vector of bytes
#[allow(clippy::too_many_arguments)]
fn write_array_data(
Expand Down Expand Up @@ -1303,13 +1365,8 @@ fn write_array_data(
)?;
}
} else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
// Slicing the views buffer is safe and easy,
// but pruning unneeded data buffers is much more nuanced since it's complicated to prove that no views reference the pruned buffers
//
// Current implementation just serialize the raw arrays as given and not try to optimize anything.
// If users wants to "compact" the arrays prior to sending them over IPC,
// they should consider the gc API suggested in #5513
for buffer in array_data.buffers() {
let view_buffers = get_byte_view_buffers(array_data);
for buffer in view_buffers {
offset = write_buffer(
buffer.as_slice(),
buffers,
Expand Down
Loading