Skip to content

interleave with memory prefetching: 2x faster? #6813

@richox

Description

@richox

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
memory prefetching is widely used in randomly accessing array items, which is very suitable in some cases of take/interleave kernels.

i have done some fast benchmark, it shows for completely random input, interleave with prefetching gains 2x performance with the current interleave implement:

$ cargo run --release
disable prefetch time: 1.305 sec
 enable prefetch time: 0.695 sec
disable prefetch time: 1.331 sec
 enable prefetch time: 0.690 sec
disable prefetch time: 1.318 sec
 enable prefetch time: 0.724 sec

benchmark code:

#![feature(core_intrinsics)]

use std::error::Error;
use std::intrinsics::prefetch_read_data;
use std::sync::Arc;
use std::time::Instant;

use arrow::array::*;
use arrow::buffer::*;
use arrow::datatypes::*;
use arrow::error::ArrowError;

fn main() -> Result<(), Box<dyn Error>> {
    let generate_random_int_string = || format!("SomeTestStr={}", rand::random::<u64>());
    let mut arrays: Vec<ArrayRef> = vec![];
    for _ in 0..1000 {
        let str_array = (0..1000)
            .map(|_| generate_random_int_string())
            .collect::<Vec<_>>();
        arrays.push(Arc::new(StringArray::from_iter_values(str_array)));
    }

    let mut random_indices = vec![];
    for _ in 0..10000000 {
        random_indices.push((
            rand::random::<usize>() % arrays.len(),
            rand::random::<usize>() % arrays[0].len(),
        ));
    }
    let random_indices_len = random_indices.len();
    for i in 0..random_indices_len {
        random_indices.swap(i, rand::random::<usize>() % random_indices_len);
    }

    fn timer<T>(name: &str, f: impl FnOnce() -> T) -> T {
        let start_time = Instant::now();
        let ret = f();
        println!("{name} time: {:.3} sec", start_time.elapsed().as_secs_f64());
        ret
    }

    // warm up
    assert_eq!(
        &interleave_without_prefetch(&arrays, &random_indices)?,
        &interleave_with_prefetch(&arrays, &random_indices)?,
    );

    // benchmark
    for _ in 0..3 {
        let batch1 = timer("disable prefetch", || interleave_without_prefetch(&arrays, &random_indices))?;
        let batch2 = timer(" enable prefetch", || interleave_with_prefetch(&arrays, &random_indices))?;
        assert_eq!(&batch1, &batch2);
    }
    Ok(())
}

fn interleave_without_prefetch(
    values: &[ArrayRef],
    indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {
    arrow::compute::interleave(&values.iter().map(|v| v.as_ref()).collect::<Vec<_>>(), indices)
}

fn interleave_with_prefetch(
    values: &[ArrayRef],
    indices: &[(usize, usize)],
) -> Result<ArrayRef, ArrowError> {

    struct Interleave<'a, T> {
        arrays: Vec<&'a T>,
        nulls: Option<NullBuffer>,
    }

    impl<'a, T: Array + 'static> Interleave<'a, T> {
        fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
            let mut has_nulls = false;
            let arrays: Vec<&T> = values
                .iter()
                .map(|x| {
                    has_nulls = has_nulls || x.null_count() != 0;
                    x.as_any().downcast_ref().unwrap()
                })
            .collect();

            let nulls = match has_nulls {
                true => {
                    let mut builder = BooleanBufferBuilder::new(indices.len());
                    for (a, b) in indices {
                        let v = arrays[*a].is_valid(*b);
                        builder.append(v)
                    }
                    Some(NullBuffer::new(builder.finish()))
                }
                false => None,
            };

            Self { arrays, nulls }
        }
    }

    fn interleave_bytes<T: ByteArrayType>(
        values: &[&dyn Array],
        indices: &[(usize, usize)],
        ) -> Result<ArrayRef, ArrowError> {
        let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);

        let mut capacity = 0;
        let mut offsets = BufferBuilder::<T::Offset>::new(indices.len() + 1);
        offsets.append(T::Offset::from_usize(0).unwrap());
        for (a, b) in indices {
            let o = interleaved.arrays[*a].value_offsets();
            let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
            capacity += element_len;
            offsets.append(T::Offset::from_usize(capacity).expect("overflow"));
        }

        let mut values = MutableBuffer::new(capacity);
        for (i, (a, b)) in indices.iter().enumerate() {
            ////////////////////////////////////////////////////////////
            // prefetch next values
            ////////////////////////////////////////////////////////////
            const PREFETCH_AHEAD: usize = 4;
            if i + PREFETCH_AHEAD < indices.len() {
                let (pa, pb) = indices[i + PREFETCH_AHEAD];
                unsafe {
                    let array = interleaved.arrays.get_unchecked(pa);
                    let start = *array.value_offsets().get_unchecked(pb);
                    let ptr = array.values().as_ptr().wrapping_add(start.as_usize());
                    prefetch_read_data(ptr, 3);
                }
            }
            values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
        }

        // Safety: safe by construction
        let array = unsafe {
            let offsets = OffsetBuffer::new_unchecked(offsets.finish().into());
            GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
        };
        Ok(Arc::new(array))
    }

    let values = values.iter().map(|v| v.as_ref()).collect::<Vec<_>>();
    match values.get(0).map(|v| v.data_type()) {
        Some(DataType::Utf8) => interleave_bytes::<GenericStringType<i32>>(&values, indices),
        Some(DataType::Binary) => interleave_bytes::<GenericBinaryType<i32>>(&values, indices),
        _ => arrow::compute::interleave(&values, indices),
    }
}

Describe the solution you'd like

  1. i would like to introduce memory prefetching tech in arrow-rs. since not all scenarios get benefit from prefetching, i suggest adding another kernel function like interleave_with_memory_prefetching so we don't break current implementation.
  2. prefetch_read_data is still unstable, i'm not sure how we can use it in stable rust.

Describe alternatives you've considered

Additional context

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementAny new improvement worthy of a entry in the changelog

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions