Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement GenericByteViewArray::gc for compacting StringViewArray and ByteViewArray #5707

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
342 changes: 342 additions & 0 deletions arrow-array/src/array/byte_view_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder, ByteView};
use arrow_schema::{ArrowError, DataType};
use std::any::Any;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::sync::Arc;
Expand Down Expand Up @@ -265,6 +266,115 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
phantom: Default::default(),
}
}

/// Returns whether the buffers are compact
pub(self) fn compact_check(&self) -> Vec<bool> {
let mut checkers: Vec<_> = self
.buffers
.iter()
.map(|b| CompactChecker::new(b.len()))
.collect();

for (i, view) in self.views.iter().enumerate() {
let view = ByteView::from(*view);
if self.is_null(i) || view.length <= 12 {
continue;
}
checkers[view.buffer_index as usize]
.accumulate(view.offset as usize, view.length as usize);
}
checkers.into_iter().map(|c| c.finish()).collect()
}

/// Returns a buffer compact version of this array
///
/// The original array will *not* be modified
///
/// # Garbage Collection
///
/// Before GC:
/// ```text
/// ┌──────┐
/// │......│
/// │......│
/// ┌────────────────────┐ ┌ ─ ─ ─ ▶ │Data1 │ Large buffer
/// │ View 1 │─ ─ ─ ─ │......│ with data that
/// ├────────────────────┤ │......│ is not referred
/// │ View 2 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data2 │ to by View 1 or
/// └────────────────────┘ │......│ View 2
/// │......│
/// 2 views, refer to │......│
/// small portions of a └──────┘
/// large buffer
/// ```
///
/// After GC:
///
/// ```text
/// ┌────────────────────┐ ┌─────┐ After gc, only
/// │ View 1 │─ ─ ─ ─ ─ ─ ─ ─▶ │Data1│ data that is
/// ├────────────────────┤ ┌ ─ ─ ─ ▶ │Data2│ pointed to by
/// │ View 2 │─ ─ ─ ─ └─────┘ the views is
/// └────────────────────┘ left
///
///
/// 2 views
/// ```
/// This method will compact the data buffers to only include the data
/// that is pointed to by the views,
/// and return a new array with the compacted data buffers.
/// The original array will be left as is.
pub fn gc(&self) -> Self {
let check_result = self.compact_check();

if check_result.iter().all(|x| *x) {
return self.clone();
}

let mut new_views = Vec::with_capacity(self.views.len());
let mut new_bufs: Vec<Vec<u8>> = vec![vec![]; self.buffers.len()];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of buffers may shrink after gc. Every buffer should be filled up to block_size.

See GenericByteViewBuilder::append_value.

for (i, view) in self.views.iter().enumerate() {
let mut bv = ByteView::from(*view);
let idx = bv.buffer_index as usize;
if self.is_null(i) || bv.length <= 12 || check_result[idx] {
new_views.push(*view);
continue;
}
// copy data to new buffer
let data = self.buffers.get(idx).unwrap();
let offset = new_bufs[idx].len();
let len = bv.length as usize;
new_bufs[idx].extend_from_slice(
data.get(bv.offset as usize..bv.offset as usize + len)
.unwrap(),
);
// update view
bv.offset = offset as u32;

new_views.push(bv.into());
}
let new_bufs: Vec<_> = new_bufs.into_iter().map(Buffer::from_vec).collect();

let new_views = ScalarBuffer::from(new_views);

let new_buffers = self
.buffers
.iter()
.enumerate()
.map(|(idx, buf)| {
if check_result[idx] {
buf.clone()
} else {
new_bufs[idx].clone()
}
})
.collect();

let mut compacted = self.clone();
compacted.buffers = new_buffers;
compacted.views = new_views;
compacted
}
}

impl<T: ByteViewType + ?Sized> Debug for GenericByteViewArray<T> {
Expand Down Expand Up @@ -482,6 +592,67 @@ impl From<Vec<Option<String>>> for StringViewArray {
}
}

/// A helper struct that used to check if the array is compact view
///
/// The checker is lazy and will not check the array until `finish` is called.
///
/// This is based on the assumption that the array will most likely to be not compact,
/// so it is likely to scan the entire array.
///
/// Then it is better to do the check at once, rather than doing it for each accumulate operation.
struct CompactChecker {
length: usize,
intervals: BTreeMap<usize, usize>,
}
Comment on lines +595 to +606
Copy link
Contributor Author

@ClSlaid ClSlaid May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is a better algorithm for this.
It's the most straightforward and simplest way I could come up with, though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend we pull the "compact checking" algorithm into a new PR to discuss it -- I am not sure about the assumption that StringViewArrays will mostly often be compacted (I would actually expect the opposite)


impl CompactChecker {
/// Create a new checker with the expected length of the buffer
pub fn new(length: usize) -> Self {
Self {
length,
intervals: BTreeMap::new(),
}
}

/// Accumulate a new covered interval to the checker
pub fn accumulate(&mut self, offset: usize, length: usize) {
if length == 0 {
return;
}
let end = offset + length;
if end > self.length {
panic!(
"Invalid interval: offset {} length {} is out of bound of length {}",
offset, length, self.length
);
}
if let Some(val) = self.intervals.get_mut(&offset) {
if *val < end {
*val = end;
}
} else {
self.intervals.insert(offset, end);
}
}

/// Check if the checker is fully covered
pub fn finish(self) -> bool {
// check if the coverage is continuous and full
let mut last_end = 0;

for (start, end) in self.intervals.iter() {
if *start > last_end {
return false;
}
if *end > last_end {
last_end = *end;
}
}

last_end == self.length
}
}

#[cfg(test)]
mod tests {
use crate::builder::{BinaryViewBuilder, StringViewBuilder};
Expand Down Expand Up @@ -645,4 +816,175 @@ mod tests {

StringViewArray::new(views, buffers, None);
}

#[test]
#[should_panic(expected = "Invalid interval: offset 0 length 13 is out of bound of length 12")]
fn test_compact_checker() {
use super::CompactChecker;

// single coverage, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 10);
assert!(checker.finish());

// single coverage, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
assert!(!checker.finish());

// multiple coverage, no overlapping, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 4);
assert!(!checker.finish());

//multiple coverage, no overlapping, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 5);
assert!(checker.finish());

//multiple coverage, overlapping, partial
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(4, 5);
assert!(!checker.finish());

//multiple coverage, overlapping, full
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(4, 6);
assert!(checker.finish());

//mutiple coverage, no overlapping, full, out of order
let mut checker = CompactChecker::new(10);
checker.accumulate(4, 6);
checker.accumulate(0, 4);
assert!(checker.finish());

// multiple coverage, overlapping, full, out of order
let mut checker = CompactChecker::new(10);
checker.accumulate(4, 6);
checker.accumulate(0, 4);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(5, 5);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(4, 6);
checker.accumulate(5, 5);
assert!(checker.finish());

// multiple coverage, overlapping, full, containing null
//
// this case is for attacking those implementation that only check
// the lower-bound of the interval
let mut checker = CompactChecker::new(10);
checker.accumulate(0, 5);
checker.accumulate(5, 0);
checker.accumulate(1, 9);
checker.accumulate(2, 3);
checker.accumulate(3, 1);
checker.accumulate(9, 1);
assert!(checker.finish());

// panic case, out of bound
let mut checker = CompactChecker::new(12);
checker.accumulate(0, 13);
checker.finish();
}

#[test]
fn test_gc() {
// ---------------------------------------------------------------------
// test compact on compacted data

let array = {
let mut builder = StringViewBuilder::new();
builder.append_value("I look at you all");
builder.append_option(Some("see the love there that's sleeping"));
builder.finish()
};
let compacted = array.gc();
// verify it is a shallow copy
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());

// ---------------------------------------------------------------------
// test compact on non-compacted data

let mut array = {
let mut builder = StringViewBuilder::new();
builder.append_value("while my guitar gently weeps");
builder.finish()
};
// shrink the view
let mut view = ByteView::from(array.views[0]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![view.into()]);
array.views = new_views;
let compacted = array.gc();
// verify it is a deep copy
assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
// verify content
assert_eq!(array.value(0), compacted.value(0));
// verify compacted
assert!(compacted.compact_check().iter().all(|x| *x));

// ---------------------------------------------------------------------
// test compact on array containing null

let mut array = {
let mut builder = StringViewBuilder::new();
builder.append_null();
builder.append_option(Some("I don't know why nobody told you"));
builder.finish()
};

let mut view = ByteView::from(array.views[1]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]);
Comment on lines +951 to +953
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dn't undertand what this is doing

I think you can more easily create a stringview with multiple buffers like this:

// Use a small capacity so we end up with multiple views
let mut builder = StringViewBuilder::with_capacity(20);
builder.append_value("hello");
builder.append_null();
builder.append_value("longer than 12 bytes");
builder.append_value("another than 12 bytes");
builder.append_null();
builder.append_value("small");

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not familiar with the rest of the code, so I made it out brutally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that StringViewArrays will mostly often be compacted (I would actually expect the opposite)

We assume the same idea, it's likely to be not compacted.

array.views = new_views;

let compacted = array.gc();

assert_ne!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
assert_eq!(array.value(0), compacted.value(0));
assert_eq!(array.value(1), compacted.value(1));
assert!(compacted.compact_check().iter().all(|x| *x));

// ---------------------------------------------------------------------
// test compact on multiple buffers

let mut array = {
let mut builder = StringViewBuilder::new().with_block_size(15);
builder.append_value("how to unfold your love");
builder.append_option(Some("I don't know how someone controlled you"));
builder.finish()
};

// verify it's not same buffer
assert_eq!(array.buffers.len(), 2);
// shrink the view

let mut view = ByteView::from(array.views[1]);
view.length = 15;
let new_views = ScalarBuffer::from(vec![array.views[0], view.into()]);
array.views = new_views;

let compacted = array.gc();
assert_eq!(compacted.buffers.len(), 2);
assert_eq!(array.value(0), compacted.value(0));
assert_eq!(array.value(1), compacted.value(1));
assert_eq!(array.buffers[0].as_ptr(), compacted.buffers[0].as_ptr());
assert_ne!(array.buffers[1].as_ptr(), compacted.buffers[1].as_ptr());
assert!(compacted.compact_check().iter().all(|x| *x));
}
}