diff --git a/datafusion/functions/src/string/common.rs b/datafusion/functions/src/string/common.rs index bda6b429ac772..133fe7f023056 100644 --- a/datafusion/functions/src/string/common.rs +++ b/datafusion/functions/src/string/common.rs @@ -19,10 +19,10 @@ use std::sync::Arc; -use crate::strings::append_view; +use crate::strings::{GenericStringArrayBuilder, StringViewArrayBuilder, append_view}; use arrow::array::{ - Array, ArrayRef, GenericStringArray, GenericStringBuilder, NullBufferBuilder, - OffsetSizeTrait, StringViewArray, StringViewBuilder, new_null_array, + Array, ArrayRef, GenericStringArray, NullBufferBuilder, OffsetSizeTrait, + StringViewArray, new_null_array, }; use arrow::buffer::{Buffer, ScalarBuffer}; use arrow::datatypes::DataType; @@ -349,18 +349,30 @@ where >(array, op)?)), DataType::Utf8View => { let string_array = as_string_view_array(array)?; - let mut string_builder = - StringViewBuilder::with_capacity(string_array.len()); - - for str in string_array.iter() { - if let Some(str) = str { - string_builder.append_value(op(str)); - } else { - string_builder.append_null(); + let item_len = string_array.len(); + // Null-preserving: reuse the input null buffer as the output null buffer. + let nulls = string_array.nulls().cloned(); + let mut builder = StringViewArrayBuilder::with_capacity(item_len); + + if let Some(ref n) = nulls { + for i in 0..item_len { + if n.is_null(i) { + builder.append_placeholder(); + } else { + // SAFETY: `n.is_null(i)` was false in the branch above. + let s = unsafe { string_array.value_unchecked(i) }; + builder.append_value(&op(s)); + } + } + } else { + for i in 0..item_len { + // SAFETY: no null buffer means every index is valid. + let s = unsafe { string_array.value_unchecked(i) }; + builder.append_value(&op(s)); } } - Ok(ColumnarValue::Array(Arc::new(string_builder.finish()))) + Ok(ColumnarValue::Array(Arc::new(builder.finish(nulls)?))) } other => exec_err!("Unsupported data type {other:?} for function {name}"), }, @@ -399,18 +411,29 @@ where // Values contain non-ASCII. let item_len = string_array.len(); - let capacity = string_array.value_data().len() + PRE_ALLOC_BYTES; - let mut builder = GenericStringBuilder::::with_capacity(item_len, capacity); + let capacity = value_data.len() + PRE_ALLOC_BYTES; + // Null-preserving: reuse the input null buffer as the output null buffer. + let nulls = string_array.nulls().cloned(); + let mut builder = GenericStringArrayBuilder::::with_capacity(item_len, capacity); - if string_array.null_count() == 0 { - let iter = - (0..item_len).map(|i| Some(op(unsafe { string_array.value_unchecked(i) }))); - builder.extend(iter); + if let Some(ref n) = nulls { + for i in 0..item_len { + if n.is_null(i) { + builder.append_placeholder(); + } else { + // SAFETY: `n.is_null(i)` was false in the branch above. + let s = unsafe { string_array.value_unchecked(i) }; + builder.append_value(&op(s)); + } + } } else { - let iter = string_array.iter().map(|string| string.map(&op)); - builder.extend(iter); + for i in 0..item_len { + // SAFETY: no null buffer means every index is valid. + let s = unsafe { string_array.value_unchecked(i) }; + builder.append_value(&op(s)); + } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(builder.finish(nulls)?)) } /// All values of string_array are ASCII, and when converting case, there is no changes in the byte @@ -438,7 +461,8 @@ where let values = Buffer::from_vec(bytes); let offsets = string_array.offsets().clone(); let nulls = string_array.nulls().cloned(); - // SAFETY: offsets and nulls are consistent with the input array. + + // SAFETY: we can reuse the offsets and nulls from the input array Ok(Arc::new(unsafe { GenericStringArray::::new_unchecked(offsets, values, nulls) })) diff --git a/datafusion/functions/src/strings.rs b/datafusion/functions/src/strings.rs index f986ffd2e3756..8564a3e720297 100644 --- a/datafusion/functions/src/strings.rs +++ b/datafusion/functions/src/strings.rs @@ -15,13 +15,14 @@ // specific language governing permissions and limitations // under the License. +use std::marker::PhantomData; use std::mem::size_of; use datafusion_common::{Result, exec_datafusion_err, internal_err}; use arrow::array::{ - Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, LargeStringArray, - StringArray, StringViewArray, make_view, + Array, ArrayAccessor, ArrayDataBuilder, BinaryArray, ByteView, GenericStringArray, + LargeStringArray, OffsetSizeTrait, StringArray, StringViewArray, make_view, }; use arrow::buffer::{Buffer, MutableBuffer, NullBuffer, ScalarBuffer}; use arrow::datatypes::DataType; @@ -29,9 +30,14 @@ use arrow::datatypes::DataType; /// Builder used by `concat`/`concat_ws` to assemble a [`StringArray`] one row /// at a time from multiple input columns. /// -/// Each row is written via repeated `write` calls, followed by a single -/// `append_offset` call to commit the row. The output null buffer is supplied -/// by the caller at `finish` time. +/// Each row is written via repeated [`Self::write`] calls (one per input +/// fragment) followed by a single [`Self::append_offset`] to commit the row. +/// The output null buffer is computed in bulk by the caller and supplied to +/// [`Self::finish`], avoiding per-row [`arrow::array::builder::NullBufferBuilder`] +/// work. +/// +/// For the common "produce one `&str` per row" pattern, prefer +/// [`GenericStringArrayBuilder`][GenericStringArrayBuilder] instead. pub(crate) struct ConcatStringBuilder { offsets_buffer: MutableBuffer, value_buffer: MutableBuffer, @@ -157,9 +163,16 @@ impl ConcatStringBuilder { /// Builder used by `concat`/`concat_ws` to assemble a [`StringViewArray`] one /// row at a time from multiple input columns. /// -/// Each row is written via repeated `write` calls, followed by a single -/// `append_offset` call to commit the row as a single string view. The output -/// null buffer is supplied by the caller at `finish` time. +/// Each row is written via repeated [`Self::write`] calls (one per input +/// fragment) followed by a single [`Self::append_offset`] to commit the row +/// as a single string view. The output null buffer is supplied by the caller +/// at [`Self::finish`] time, avoiding per-row +/// [`arrow::array::builder::NullBufferBuilder`] work. +/// +/// For the common "produce one `&str` per row" pattern, prefer +/// [`StringViewArrayBuilder`] instead. +/// +/// [`StringViewArray`]: arrow::array::StringViewArray pub(crate) struct ConcatStringViewBuilder { views: Vec, data: Vec, @@ -292,6 +305,9 @@ impl ConcatStringViewBuilder { /// Builder used by `concat`/`concat_ws` to assemble a [`LargeStringArray`] one /// row at a time from multiple input columns. See [`ConcatStringBuilder`] for /// details on the row-composition contract. +/// +/// For the common "produce one `&str` per row" pattern, prefer +/// [`GenericStringArrayBuilder`][GenericStringArrayBuilder] instead. pub(crate) struct ConcatLargeStringBuilder { offsets_buffer: MutableBuffer, value_buffer: MutableBuffer, @@ -414,6 +430,241 @@ impl ConcatLargeStringBuilder { } } +// ---------------------------------------------------------------------------- +// Bulk-nulls builders +// +// These builders are similar to Arrow's `GenericStringBuilder` and +// `StringViewBuilder`, except that callers must pass the NULL bitmap to +// `finish()`, rather than maintaining it iteratively (per-row). For callers +// that can compute the NULL bitmap in bulk (which is true of many +// string-related UDFs), this can be significantly more efficient. +// +// For a row known to be null, call `append_placeholder` to advance the row +// count without touching the value buffer; the placeholder slot will be +// masked by the caller-supplied null buffer. +// ---------------------------------------------------------------------------- + +/// Builder for a [`GenericStringArray`] that defers null tracking to +/// `finish`. Instantiate with `O = i32` for [`StringArray`] (Utf8) or +/// `O = i64` for [`LargeStringArray`] (LargeUtf8). +pub(crate) struct GenericStringArrayBuilder { + offsets_buffer: MutableBuffer, + value_buffer: MutableBuffer, + placeholder_count: usize, + _phantom: PhantomData, +} + +impl GenericStringArrayBuilder { + pub fn with_capacity(item_capacity: usize, data_capacity: usize) -> Self { + let capacity = item_capacity + .checked_add(1) + .map(|i| i.saturating_mul(size_of::())) + .expect("capacity integer overflow"); + + let mut offsets_buffer = MutableBuffer::with_capacity(capacity); + offsets_buffer.push(O::usize_as(0)); + Self { + offsets_buffer, + value_buffer: MutableBuffer::with_capacity(data_capacity), + placeholder_count: 0, + _phantom: PhantomData, + } + } + + /// Append `value` as the next row. + /// + /// # Panics + /// + /// Panics if the cumulative byte length exceeds `O::MAX`. + pub fn append_value(&mut self, value: &str) { + self.value_buffer.extend_from_slice(value.as_bytes()); + let next_offset = + O::from_usize(self.value_buffer.len()).expect("byte array offset overflow"); + self.offsets_buffer.push(next_offset); + } + + /// Append an empty placeholder row. The corresponding slot must be masked + /// as null by the null buffer passed to `finish`. + pub fn append_placeholder(&mut self) { + let next_offset = + O::from_usize(self.value_buffer.len()).expect("byte array offset overflow"); + self.offsets_buffer.push(next_offset); + self.placeholder_count += 1; + } + + /// Finalize into a [`GenericStringArray`] using the caller-supplied + /// null buffer. + /// + /// # Errors + /// + /// Returns an error when `null_buffer.len()` does not match the number of + /// appended rows. + pub fn finish( + self, + null_buffer: Option, + ) -> Result> { + let row_count = self.offsets_buffer.len() / size_of::() - 1; + if let Some(ref n) = null_buffer + && n.len() != row_count + { + return internal_err!( + "Null buffer length ({}) must match row count ({row_count})", + n.len() + ); + } + let null_count = null_buffer.as_ref().map_or(0, |n| n.null_count()); + debug_assert!( + null_count >= self.placeholder_count, + "{} placeholder rows but null buffer has {null_count} nulls", + self.placeholder_count, + ); + let array_data = ArrayDataBuilder::new(GenericStringArray::::DATA_TYPE) + .len(row_count) + .add_buffer(self.offsets_buffer.into()) + .add_buffer(self.value_buffer.into()) + .nulls(null_buffer); + // SAFETY: every appended value came from a `&str`, so the value + // buffer is valid UTF-8 and offsets are monotonically non-decreasing. + let array_data = unsafe { array_data.build_unchecked() }; + Ok(GenericStringArray::::from(array_data)) + } +} + +/// Starting size for the long-string data block; matches Arrow's +/// `GenericByteViewBuilder` default. +const STARTING_BLOCK_SIZE: u32 = 8 * 1024; +/// Maximum size each long-string data block grows to; matches Arrow's +/// `GenericByteViewBuilder` default. +const MAX_BLOCK_SIZE: u32 = 2 * 1024 * 1024; + +/// Builder for a [`StringViewArray`] that defers null tracking to `finish`. +/// +/// Modeled on Arrow's [`arrow::array::builder::StringViewBuilder`] but +/// without per-row [`arrow::array::builder::NullBufferBuilder`] maintenance. +/// Long strings (> 12 bytes) are appended into an in-progress data block; +/// short strings are inlined into the view itself. When the in-progress block +/// fills up it is flushed into `completed` and a new block — double the size +/// of the last, capped at [`MAX_BLOCK_SIZE`] — is started. +pub(crate) struct StringViewArrayBuilder { + views: Vec, + in_progress: Vec, + completed: Vec, + /// Current block-size target; doubles each time a block is flushed, up to + /// [`MAX_BLOCK_SIZE`]. + block_size: u32, + placeholder_count: usize, +} + +impl StringViewArrayBuilder { + pub fn with_capacity(item_capacity: usize) -> Self { + Self { + views: Vec::with_capacity(item_capacity), + in_progress: Vec::new(), + completed: Vec::new(), + block_size: STARTING_BLOCK_SIZE, + placeholder_count: 0, + } + } + + /// Doubles the block-size target (capped at [`MAX_BLOCK_SIZE`]) and + /// returns the new size. The first call returns `2 * STARTING_BLOCK_SIZE`. + fn next_block_size(&mut self) -> u32 { + if self.block_size < MAX_BLOCK_SIZE { + self.block_size = self.block_size.saturating_mul(2); + } + self.block_size + } + + /// Append `value` as the next row. + /// + /// # Panics + /// + /// Panics if the value length, the in-progress buffer offset, or the + /// number of completed buffers exceeds `i32::MAX`. The ByteView spec + /// uses signed 32-bit integers for these fields; exceeding `i32::MAX` + /// would produce an array that does not round-trip through Arrow IPC + /// (see ). + #[inline] + pub fn append_value(&mut self, value: &str) { + let v = value.as_bytes(); + let length: u32 = + i32::try_from(v.len()).expect("value length exceeds i32::MAX") as u32; + if length <= 12 { + self.views.push(make_view(v, 0, 0)); + return; + } + + let required_cap = self.in_progress.len() + length as usize; + if self.in_progress.capacity() < required_cap { + self.flush_in_progress(); + let to_reserve = (length as usize).max(self.next_block_size() as usize); + self.in_progress.reserve(to_reserve); + } + + let buffer_index: u32 = i32::try_from(self.completed.len()) + .expect("buffer count exceeds i32::MAX") + as u32; + let offset: u32 = i32::try_from(self.in_progress.len()) + .expect("offset exceeds i32::MAX") as u32; + self.in_progress.extend_from_slice(v); + self.views.push(make_view(v, buffer_index, offset)); + } + + /// Append an empty placeholder row. The corresponding slot must be + /// masked as null by the null buffer passed to `finish`. + #[inline] + pub fn append_placeholder(&mut self) { + // Zero-length inline view — `length` field is 0, no buffer ref. + self.views.push(0); + self.placeholder_count += 1; + } + + fn flush_in_progress(&mut self) { + if !self.in_progress.is_empty() { + let block = std::mem::take(&mut self.in_progress); + self.completed.push(Buffer::from_vec(block)); + } + } + + /// Finalize into a [`StringViewArray`] using the caller-supplied null + /// buffer. + /// + /// # Errors + /// + /// Returns an error when `null_buffer.len()` does not match the number of + /// appended rows. + pub fn finish(mut self, null_buffer: Option) -> Result { + if let Some(ref n) = null_buffer + && n.len() != self.views.len() + { + return internal_err!( + "Null buffer length ({}) must match row count ({})", + n.len(), + self.views.len() + ); + } + let null_count = null_buffer.as_ref().map_or(0, |n| n.null_count()); + debug_assert!( + null_count >= self.placeholder_count, + "{} placeholder rows but null buffer has {null_count} nulls", + self.placeholder_count, + ); + self.flush_in_progress(); + // SAFETY: every long-string view references bytes we wrote ourselves + // into `self.completed`, with prefixes derived from those same bytes. + // Inline views were built from valid `&str`. Placeholder views are + // zero-length with no buffer reference. + let array = unsafe { + StringViewArray::new_unchecked( + ScalarBuffer::from(self.views), + self.completed, + null_buffer, + ) + }; + Ok(array) + } +} + /// Append a new view to the views buffer with the given substr. /// /// Callers are responsible for their own null tracking. @@ -512,4 +763,170 @@ mod tests { fn test_overflow_concat_large_string_builder() { let _builder = ConcatLargeStringBuilder::with_capacity(usize::MAX, usize::MAX); } + + #[test] + fn string_array_builder_empty() { + let builder = GenericStringArrayBuilder::::with_capacity(0, 0); + let array = builder.finish(None).unwrap(); + assert_eq!(array.len(), 0); + } + + #[test] + fn string_array_builder_no_nulls() { + let mut builder = GenericStringArrayBuilder::::with_capacity(3, 16); + builder.append_value("foo"); + builder.append_value(""); + builder.append_value("hello world"); + let array = builder.finish(None).unwrap(); + assert_eq!(array.len(), 3); + assert_eq!(array.value(0), "foo"); + assert_eq!(array.value(1), ""); + assert_eq!(array.value(2), "hello world"); + assert_eq!(array.null_count(), 0); + } + + #[test] + fn string_array_builder_with_nulls() { + let mut builder = GenericStringArrayBuilder::::with_capacity(3, 8); + builder.append_value("a"); + builder.append_placeholder(); + builder.append_value("c"); + let nulls = NullBuffer::from(vec![true, false, true]); + let array = builder.finish(Some(nulls)).unwrap(); + assert_eq!(array.len(), 3); + assert_eq!(array.value(0), "a"); + assert!(array.is_null(1)); + assert_eq!(array.value(2), "c"); + } + + #[test] + fn string_array_builder_null_buffer_length_mismatch() { + let mut builder = GenericStringArrayBuilder::::with_capacity(2, 4); + builder.append_value("a"); + builder.append_value("b"); + let nulls = NullBuffer::from(vec![true, false, true]); + assert!(builder.finish(Some(nulls)).is_err()); + } + + #[test] + #[cfg(debug_assertions)] + #[should_panic(expected = "placeholder rows")] + fn string_array_builder_placeholder_without_null_mask() { + let mut builder = GenericStringArrayBuilder::::with_capacity(2, 4); + builder.append_value("a"); + builder.append_placeholder(); + // Slot 1 is a placeholder but the null buffer doesn't mark it null. + let nulls = NullBuffer::from(vec![true, true]); + let _ = builder.finish(Some(nulls)); + } + + #[test] + #[cfg(debug_assertions)] + #[should_panic(expected = "placeholder rows")] + fn string_array_builder_placeholder_with_none_null_buffer() { + let mut builder = GenericStringArrayBuilder::::with_capacity(1, 4); + builder.append_placeholder(); + let _ = builder.finish(None); + } + + #[test] + fn large_string_array_builder_with_nulls() { + let mut builder = GenericStringArrayBuilder::::with_capacity(3, 8); + builder.append_value("a"); + builder.append_placeholder(); + builder.append_value("c"); + let nulls = NullBuffer::from(vec![true, false, true]); + let array = builder.finish(Some(nulls)).unwrap(); + assert_eq!(array.len(), 3); + assert_eq!(array.value(0), "a"); + assert!(array.is_null(1)); + assert_eq!(array.value(2), "c"); + } + + #[test] + fn string_view_array_builder_empty() { + let builder = StringViewArrayBuilder::with_capacity(0); + let array = builder.finish(None).unwrap(); + assert_eq!(array.len(), 0); + } + + #[test] + fn string_view_array_builder_inline_and_buffer() { + let mut builder = StringViewArrayBuilder::with_capacity(3); + builder.append_value("short"); // ≤ 12 bytes, inline + builder.append_value("a string longer than twelve bytes"); + builder.append_value(""); + let array = builder.finish(None).unwrap(); + assert_eq!(array.len(), 3); + assert_eq!(array.value(0), "short"); + assert_eq!(array.value(1), "a string longer than twelve bytes"); + assert_eq!(array.value(2), ""); + } + + #[test] + fn string_view_array_builder_with_nulls() { + let mut builder = StringViewArrayBuilder::with_capacity(4); + builder.append_value("a string longer than twelve bytes"); + builder.append_placeholder(); + builder.append_value("short"); + builder.append_placeholder(); + let nulls = NullBuffer::from(vec![true, false, true, false]); + let array = builder.finish(Some(nulls)).unwrap(); + assert_eq!(array.len(), 4); + assert_eq!(array.value(0), "a string longer than twelve bytes"); + assert!(array.is_null(1)); + assert_eq!(array.value(2), "short"); + assert!(array.is_null(3)); + } + + #[test] + fn string_view_array_builder_null_buffer_length_mismatch() { + let mut builder = StringViewArrayBuilder::with_capacity(2); + builder.append_value("a"); + builder.append_value("b"); + let nulls = NullBuffer::from(vec![true, false, true]); + assert!(builder.finish(Some(nulls)).is_err()); + } + + #[test] + #[cfg(debug_assertions)] + #[should_panic(expected = "placeholder rows")] + fn string_view_array_builder_placeholder_without_null_mask() { + let mut builder = StringViewArrayBuilder::with_capacity(2); + builder.append_value("a"); + builder.append_placeholder(); + let nulls = NullBuffer::from(vec![true, true]); + let _ = builder.finish(Some(nulls)); + } + + #[test] + #[cfg(debug_assertions)] + #[should_panic(expected = "placeholder rows")] + fn string_view_array_builder_placeholder_with_none_null_buffer() { + let mut builder = StringViewArrayBuilder::with_capacity(1); + builder.append_placeholder(); + let _ = builder.finish(None); + } + + #[test] + fn string_view_array_builder_flushes_full_blocks() { + // Each value is 300 bytes. The first data block is 2 × STARTING_BLOCK_SIZE + // = 16 KiB, so ~50 values saturate it and the rest spill into additional + // blocks. + let value = "x".repeat(300); + let mut builder = StringViewArrayBuilder::with_capacity(100); + for _ in 0..100 { + builder.append_value(&value); + } + let array = builder.finish(None).unwrap(); + assert_eq!(array.len(), 100); + assert!( + array.data_buffers().len() > 1, + "expected multiple data buffers, got {}", + array.data_buffers().len() + ); + for i in 0..100 { + assert_eq!(array.value(i), value); + } + } }