Skip to content

Commit

Permalink
For review
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Nov 15, 2022
1 parent 8311372 commit e969e5f
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 33 deletions.
32 changes: 20 additions & 12 deletions arrow-array/src/array/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
/// assert_eq!(c, Int32Array::from(vec![Some(11), Some(15), None]));
/// # }
/// ```
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, ArrowError>
pub fn unary_mut<F>(self, op: F) -> Result<PrimitiveArray<T>, PrimitiveArray<T>>
where
F: Fn(T::Native) -> T::Native,
{
Expand All @@ -424,25 +424,32 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
let null_count = self.null_count();
let null_buffer = data.null_buffer().map(|b| b.bit_slice(data.offset(), len));

let mut buffers = self.data.get_buffers();
let buffer = buffers.remove(0);
let buffer = self.data.buffers()[0].clone();
let buffer_len = buffer.len();

drop(self.data);

let mutable_buffer = buffer.into_mutable(buffer_len);

let buffer = match mutable_buffer {
match mutable_buffer {
Ok(mut mutable_buffer) => {
mutable_buffer
.typed_data_mut()
.iter_mut()
.for_each(|l| *l = op(*l));
Ok(mutable_buffer.into())
Ok(unsafe {
build_primitive_array(
len,
mutable_buffer.into(),
null_count,
null_buffer,
)
})
}
Err(_) => Err(ArrowError::InvalidArgumentError(
"Not a mutable array because its buffer is shared.".to_string(),
)),
}?;
Ok(unsafe { build_primitive_array(len, buffer, null_count, null_buffer) })
Err(buffer) => Err(unsafe {
build_primitive_array(len, buffer, null_count, null_buffer)
}),
}
}

/// Applies a unary and fallible function to all valid values in a primitive array
Expand Down Expand Up @@ -550,8 +557,9 @@ impl<T: ArrowPrimitiveType> PrimitiveArray<T> {
let len = self.len();
let null_bit_buffer = self.data.null_buffer().cloned();

let mut buffers = self.data.get_buffers();
let buffer = buffers.remove(0);
let buffer = (&self.data.buffers()[0]).clone();

drop(self.data);

let builder = buffer
.into_mutable(0)
Expand Down
5 changes: 3 additions & 2 deletions arrow-array/src/builder/null_buffer_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ impl NullBufferBuilder {
}
}

pub fn new_from_buffer(buffer: Option<MutableBuffer>, capacity: usize) -> Self {
let bitmap_builder = buffer.map(BooleanBufferBuilder::new_from_buffer);
/// Creates a new builder from a `MutableBuffer`.
pub fn new_from_buffer(buffer: MutableBuffer, capacity: usize) -> Self {
let bitmap_builder = Some(BooleanBufferBuilder::new_from_buffer(buffer));
Self {
bitmap_builder,
len: 0,
Expand Down
9 changes: 5 additions & 4 deletions arrow-array/src/builder/primitive_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
) -> Self {
let capacity = values_buffer.capacity();

let null_buffer_builder = null_buffer
.map(|buffer| NullBufferBuilder::new_from_buffer(buffer, capacity))
.unwrap_or_else(|| NullBufferBuilder::new(capacity));

Self {
values_builder: BufferBuilder::<T::Native>::new_from_buffer(values_buffer),
null_buffer_builder: NullBufferBuilder::new_from_buffer(
null_buffer,
capacity,
),
null_buffer_builder,
}
}

Expand Down
11 changes: 4 additions & 7 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::fmt::Debug;
use std::iter::FromIterator;
use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, mem, usize};
use std::{convert::AsRef, usize};

use crate::alloc::{Allocation, Deallocation};
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
Expand Down Expand Up @@ -229,19 +229,16 @@ impl Buffer {
}

/// Returns `MutableBuffer` for mutating the buffer if this buffer is not shared.
/// Returns `Err` if this is shared or its allocation is from an external source.
pub fn into_mutable(self, len: usize) -> Result<MutableBuffer, Self> {
let offset_ptr = self.as_ptr();
let offset = self.offset;
let length = self.length;
Arc::try_unwrap(self.data)
.map(|bytes| {
.and_then(|bytes| {
// The pointer of underlying buffer should not be offset.
assert_eq!(offset_ptr, bytes.ptr().as_ptr());

let mutable_buffer =
MutableBuffer::from_ptr(bytes.ptr(), len, bytes.capacity());
mem::forget(bytes);
mutable_buffer
MutableBuffer::from_bytes(bytes, len).map_err(Arc::new)
})
.map_err(|bytes| Buffer {
data: bytes,
Expand Down
17 changes: 13 additions & 4 deletions arrow-buffer/src/buffer/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::{
native::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
use std::mem;
use std::ptr::NonNull;

/// A [`MutableBuffer`] is Arrow's interface to build a [`Buffer`] out of items or slices of items.
Expand Down Expand Up @@ -92,13 +93,21 @@ impl MutableBuffer {
}
}

/// Allocates a new [MutableBuffer] from given pointer `ptr`, `capacity`.
pub(crate) fn from_ptr(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
Self {
/// Allocates a new [MutableBuffer] from given `Bytes`.
pub(crate) fn from_bytes(bytes: Bytes, len: usize) -> Result<Self, Bytes> {
if !matches!(bytes.deallocation(), Deallocation::Arrow(_)) {
return Err(bytes);
}

let capacity = bytes.capacity();
let ptr = bytes.ptr();
mem::forget(bytes);

Ok(Self {
data: ptr,
len,
capacity,
}
})
}

/// creates a new [MutableBuffer] with capacity and length capable of holding `len` bits.
Expand Down
5 changes: 5 additions & 0 deletions arrow-buffer/src/bytes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ impl Bytes {
Deallocation::Custom(_) => 0,
}
}

#[inline]
pub(crate) fn deallocation(&self) -> &Deallocation {
&self.deallocation
}
}

// Deallocation is Send + Sync, repeating the bound here makes that refactoring safe
Expand Down
4 changes: 0 additions & 4 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,6 @@ impl ArrayData {
&self.buffers[..]
}

pub fn get_buffers(self) -> Vec<Buffer> {
self.buffers
}

/// Returns a slice of children data arrays
pub fn child_data(&self) -> &[ArrayData] {
&self.child_data[..]
Expand Down

0 comments on commit e969e5f

Please sign in to comment.