Skip to content

Commit

Permalink
ARROW-5357: [Rust] Change Buffer::len to represent total bytes instea…
Browse files Browse the repository at this point in the history
…d of used bytes

Closes #4331 from sunchao/ARROW-5357 and squashes the following commits:

2169ec5 <Chao Sun> Fix Bitmap equality
bbb30cb <Chao Sun> Fix rebase error
1ab4ccc <Chao Sun> Add capacity in Buffer struct
d9be1df <Chao Sun> ARROW-5357:  Change Buffer::len to represent total bytes instead of used bytes

Authored-by: Chao Sun <sunchao@apache.org>
Signed-off-by: Paddy Horan <paddyhoran@hotmail.com>
  • Loading branch information
sunchao authored and paddyhoran committed Feb 21, 2020
1 parent 4e53749 commit 28ec94c
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 26 deletions.
6 changes: 3 additions & 3 deletions rust/arrow/src/array/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2821,7 +2821,7 @@ mod tests {
#[should_panic(expected = "memory is not aligned")]
fn test_primitive_array_alignment() {
let ptr = memory::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8) };
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
let buf2 = buf.slice(1);
let array_data = ArrayData::builder(DataType::Int32).add_buffer(buf2).build();
Int32Array::from(array_data);
Expand All @@ -2831,7 +2831,7 @@ mod tests {
#[should_panic(expected = "memory is not aligned")]
fn test_list_array_alignment() {
let ptr = memory::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8) };
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
let buf2 = buf.slice(1);

let values: [i32; 8] = [0; 8];
Expand All @@ -2851,7 +2851,7 @@ mod tests {
#[should_panic(expected = "memory is not aligned")]
fn test_binary_array_alignment() {
let ptr = memory::allocate_aligned(8);
let buf = unsafe { Buffer::from_raw_parts(ptr, 8) };
let buf = unsafe { Buffer::from_raw_parts(ptr, 8, 8) };
let buf2 = buf.slice(1);

let values: [u8; 12] = [0; 12];
Expand Down
15 changes: 14 additions & 1 deletion rust/arrow/src/bitmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::util::bit_util;

use std::ops::{BitAnd, BitOr};

#[derive(PartialEq, Debug, Clone)]
#[derive(Debug, Clone)]
pub struct Bitmap {
pub(crate) bits: Buffer,
}
Expand Down Expand Up @@ -87,6 +87,19 @@ impl From<Buffer> for Bitmap {
}
}

impl PartialEq for Bitmap {
fn eq(&self, other: &Self) -> bool {
// buffer equality considers capacity, but here we want to only compare
// actual data contents
let self_len = self.bits.len();
let other_len = other.bits.len();
if self_len != other_len {
return false;
}
&self.bits.data()[..self_len] == &other.bits.data()[..self_len]
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
92 changes: 73 additions & 19 deletions rust/arrow/src/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,28 @@ struct BufferData {
/// The raw pointer into the buffer bytes
ptr: *const u8,

/// The length (num of bytes) of the buffer
/// The length (num of bytes) of the buffer. The region `[0, len)` of the buffer
/// is occupied with meaningful data, while the rest `[len, capacity)` is the
/// unoccupied region.
len: usize,

/// Whether this piece of memory is owned by this object
owned: bool,

/// The capacity (num of bytes) of the buffer
/// Invariant: len <= capacity
capacity: usize,
}

impl PartialEq for BufferData {
fn eq(&self, other: &BufferData) -> bool {
if self.len != other.len {
return false;
}
if self.capacity != other.capacity {
return false;
}

unsafe { memory::memcmp(self.ptr, other.ptr, self.len) == 0 }
}
}
Expand All @@ -73,7 +83,7 @@ impl PartialEq for BufferData {
impl Drop for BufferData {
fn drop(&mut self) {
if !self.ptr.is_null() && self.owned {
memory::free_aligned(self.ptr as *mut u8, self.len);
memory::free_aligned(self.ptr as *mut u8, self.capacity);
}
}
}
Expand All @@ -82,8 +92,8 @@ impl Debug for BufferData {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"BufferData {{ ptr: {:?}, len: {}, data: ",
self.ptr, self.len
"BufferData {{ ptr: {:?}, len: {}, capacity: {}, data: ",
self.ptr, self.len, self.capacity
)?;

unsafe {
Expand All @@ -104,13 +114,14 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes.
pub unsafe fn from_raw_parts(ptr: *const u8, len: usize) -> Self {
Buffer::build_with_arguments(ptr, len, true)
pub unsafe fn from_raw_parts(ptr: *const u8, len: usize, capacity: usize) -> Self {
Buffer::build_with_arguments(ptr, len, capacity, true)
}

/// Creates a buffer from an existing memory region (must already be byte-aligned), this
Expand All @@ -120,13 +131,14 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes.
pub unsafe fn from_unowned(ptr: *const u8, len: usize) -> Self {
Buffer::build_with_arguments(ptr, len, false)
pub unsafe fn from_unowned(ptr: *const u8, len: usize, capacity: usize) -> Self {
Buffer::build_with_arguments(ptr, len, capacity, false)
}

/// Creates a buffer from an existing memory region (must already be byte-aligned).
Expand All @@ -135,19 +147,30 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in bytes
/// * `capacity` - Total allocated memory for the pointer `ptr`, in **bytes**
/// * `owned` - Whether the raw parts is owned by this `Buffer`. If true, this `Buffer` will
/// free this memory when dropped, otherwise it will skip freeing the raw parts.
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes.
unsafe fn build_with_arguments(ptr: *const u8, len: usize, owned: bool) -> Self {
unsafe fn build_with_arguments(
ptr: *const u8,
len: usize,
capacity: usize,
owned: bool,
) -> Self {
assert!(
memory::is_aligned(ptr, memory::ALIGNMENT),
"memory not aligned"
);
let buf_data = BufferData { ptr, len, owned };
let buf_data = BufferData {
ptr,
len,
capacity,
owned,
};
Buffer {
data: Arc::new(buf_data),
offset: 0,
Expand All @@ -159,6 +182,11 @@ impl Buffer {
self.data.len - self.offset
}

/// Returns the capacity of this buffer
pub fn capacity(&self) -> usize {
self.data.capacity
}

/// Returns whether the buffer is empty.
pub fn is_empty(&self) -> bool {
self.data.len - self.offset == 0
Expand Down Expand Up @@ -210,7 +238,7 @@ impl Buffer {

/// Returns an empty buffer.
pub fn empty() -> Self {
unsafe { Self::from_raw_parts(::std::ptr::null(), 0) }
unsafe { Self::from_raw_parts(::std::ptr::null(), 0, 0) }
}
}

Expand All @@ -234,7 +262,7 @@ impl<T: AsRef<[u8]>> From<T> for Buffer {
let buffer = memory::allocate_aligned(capacity);
unsafe {
memory::memcpy(buffer, slice.as_ptr(), len);
Buffer::from_raw_parts(buffer, len)
Buffer::from_raw_parts(buffer, len, capacity)
}
}
}
Expand Down Expand Up @@ -504,6 +532,7 @@ impl MutableBuffer {
let buffer_data = BufferData {
ptr: self.data,
len: self.len,
capacity: self.capacity,
owned: true,
};
std::mem::forget(self);
Expand All @@ -527,6 +556,9 @@ impl PartialEq for MutableBuffer {
if self.len != other.len {
return false;
}
if self.capacity != other.capacity {
return false;
}
unsafe { memory::memcmp(self.data, other.data, self.len) == 0 }
}
}
Expand Down Expand Up @@ -584,45 +616,47 @@ mod tests {

#[test]
fn test_from_raw_parts() {
let buf = unsafe { Buffer::from_raw_parts(null_mut(), 0) };
let buf = unsafe { Buffer::from_raw_parts(null_mut(), 0, 0) };
assert_eq!(0, buf.len());
assert_eq!(0, buf.data().len());
assert_eq!(0, buf.capacity());
assert!(buf.raw_data().is_null());

let buf = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(5, buf.len());
assert!(!buf.raw_data().is_null());
assert_eq!(&[0, 1, 2, 3, 4], buf.data());
assert_eq!([0, 1, 2, 3, 4], buf.data());
}

#[test]
fn test_from_vec() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
assert_eq!(5, buf.len());
assert!(!buf.raw_data().is_null());
assert_eq!(&[0, 1, 2, 3, 4], buf.data());
assert_eq!([0, 1, 2, 3, 4], buf.data());
}

#[test]
fn test_copy() {
let buf = Buffer::from(&[0, 1, 2, 3, 4]);
let buf2 = buf.clone();
assert_eq!(5, buf2.len());
assert_eq!(64, buf2.capacity());
assert!(!buf2.raw_data().is_null());
assert_eq!(&[0, 1, 2, 3, 4], buf2.data());
assert_eq!([0, 1, 2, 3, 4], buf2.data());
}

#[test]
fn test_slice() {
let buf = Buffer::from(&[2, 4, 6, 8, 10]);
let buf2 = buf.slice(2);

assert_eq!(&[6, 8, 10], buf2.data());
assert_eq!([6, 8, 10], buf2.data());
assert_eq!(3, buf2.len());
assert_eq!(unsafe { buf.raw_data().offset(2) }, buf2.raw_data());

let buf3 = buf2.slice(1);
assert_eq!(&[8, 10], buf3.data());
assert_eq!([8, 10], buf3.data());
assert_eq!(2, buf3.len());
assert_eq!(unsafe { buf.raw_data().offset(3) }, buf3.raw_data());

Expand Down Expand Up @@ -778,18 +812,38 @@ mod tests {
buf.write("aaaa bbbb cccc dddd".as_bytes())
.expect("write should be OK");
assert_eq!(19, buf.len());
assert_eq!(64, buf.capacity());
assert_eq!("aaaa bbbb cccc dddd".as_bytes(), buf.data());

let immutable_buf = buf.freeze();
assert_eq!(19, immutable_buf.len());
assert_eq!(64, immutable_buf.capacity());
assert_eq!("aaaa bbbb cccc dddd".as_bytes(), immutable_buf.data());
}

#[test]
fn test_mutable_equal() -> Result<()> {
let mut buf = MutableBuffer::new(1);
let mut buf2 = MutableBuffer::new(1);

buf.write(&[0xaa])?;
buf2.write(&[0xaa, 0xbb])?;
assert!(buf != buf2);

buf.write(&[0xbb])?;
assert_eq!(buf, buf2);

buf2.reserve(65)?;
assert!(buf != buf2);

Ok(())
}

#[test]
fn test_access_concurrently() {
let buffer = Buffer::from(vec![1, 2, 3, 4, 5]);
let buffer2 = buffer.clone();
assert_eq!(&[1, 2, 3, 4, 5], buffer.data());
assert_eq!([1, 2, 3, 4, 5], buffer.data());

let buffer_copy = thread::spawn(move || {
// access buffer in another thread.
Expand Down
9 changes: 6 additions & 3 deletions rust/arrow/src/tensor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,10 @@ impl<'a, T: ArrowPrimitiveType> Tensor<'a, T> {

/// The total number of elements in the `Tensor`
pub fn size(&self) -> usize {
(self.buffer.len() / mem::size_of::<T::Native>())
match self.shape {
None => 0,
Some(ref s) => s.iter().fold(1, |a, b| a * b),
}
}

/// Indicates if the data is laid out contiguously in memory
Expand Down Expand Up @@ -255,7 +258,7 @@ mod tests {
fn test_zero_dim() {
let buf = Buffer::from(&[1]);
let tensor = UInt8Tensor::new(buf, None, None, None);
assert_eq!(1, tensor.size());
assert_eq!(0, tensor.size());
assert_eq!(None, tensor.shape());
assert_eq!(None, tensor.names());
assert_eq!(0, tensor.ndim());
Expand All @@ -265,7 +268,7 @@ mod tests {

let buf = Buffer::from(&[1, 2, 2, 2]);
let tensor = Int32Tensor::new(buf, None, None, None);
assert_eq!(1, tensor.size());
assert_eq!(0, tensor.size());
assert_eq!(None, tensor.shape());
assert_eq!(None, tensor.names());
assert_eq!(0, tensor.ndim());
Expand Down

0 comments on commit 28ec94c

Please sign in to comment.