Skip to content

Commit

Permalink
Add SerializeRecordBatch API, various API scrubbing, make some intege…
Browse files Browse the repository at this point in the history
…r arguments const

Change-Id: I550d70f4fe30e9f3e8050889c409ca5293708477
  • Loading branch information
wesm committed Aug 22, 2017
1 parent 4e0aa3c commit 2952cfb
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 112 deletions.
25 changes: 13 additions & 12 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

namespace arrow {

Status Buffer::Copy(int64_t start, int64_t nbytes, MemoryPool* pool,
Status Buffer::Copy(const int64_t start, const int64_t nbytes, MemoryPool* pool,
std::shared_ptr<Buffer>* out) const {
// Sanity checks
DCHECK_LT(start, size_);
Expand All @@ -42,11 +42,12 @@ Status Buffer::Copy(int64_t start, int64_t nbytes, MemoryPool* pool,
return Status::OK();
}

Status Buffer::Copy(int64_t start, int64_t nbytes, std::shared_ptr<Buffer>* out) const {
Status Buffer::Copy(const int64_t start, const int64_t nbytes,
std::shared_ptr<Buffer>* out) const {
return Copy(start, nbytes, default_memory_pool(), out);
}

bool Buffer::Equals(const Buffer& other, int64_t nbytes) const {
bool Buffer::Equals(const Buffer& other, const int64_t nbytes) const {
return this == &other || (size_ >= nbytes && other.size_ >= nbytes &&
(data_ == other.data_ ||
!memcmp(data_, other.data_, static_cast<size_t>(nbytes))));
Expand All @@ -71,10 +72,10 @@ PoolBuffer::~PoolBuffer() {
}
}

Status PoolBuffer::Reserve(int64_t new_capacity) {
if (!mutable_data_ || new_capacity > capacity_) {
Status PoolBuffer::Reserve(const int64_t capacity) {
if (!mutable_data_ || capacity > capacity_) {
uint8_t* new_data;
new_capacity = BitUtil::RoundUpToMultipleOf64(new_capacity);
int64_t new_capacity = BitUtil::RoundUpToMultipleOf64(capacity);
if (mutable_data_) {
RETURN_NOT_OK(pool_->Reallocate(capacity_, new_capacity, &mutable_data_));
} else {
Expand All @@ -87,7 +88,7 @@ Status PoolBuffer::Reserve(int64_t new_capacity) {
return Status::OK();
}

Status PoolBuffer::Resize(int64_t new_size, bool shrink_to_fit) {
Status PoolBuffer::Resize(const int64_t new_size, bool shrink_to_fit) {
if (!shrink_to_fit || (new_size > size_)) {
RETURN_NOT_OK(Reserve(new_size));
} else {
Expand All @@ -113,26 +114,26 @@ Status PoolBuffer::Resize(int64_t new_size, bool shrink_to_fit) {
}

std::shared_ptr<Buffer> SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer,
int64_t offset, int64_t length) {
const int64_t offset, const int64_t length) {
return std::make_shared<MutableBuffer>(buffer, offset, length);
}

MutableBuffer::MutableBuffer(const std::shared_ptr<Buffer>& parent, int64_t offset,
int64_t size)
MutableBuffer::MutableBuffer(const std::shared_ptr<Buffer>& parent, const int64_t offset,
const int64_t size)
: MutableBuffer(parent->mutable_data() + offset, size) {
DCHECK(parent->is_mutable()) << "Must pass mutable buffer";
parent_ = parent;
}

Status AllocateBuffer(MemoryPool* pool, int64_t size,
Status AllocateBuffer(MemoryPool* pool, const int64_t size,
std::shared_ptr<MutableBuffer>* out) {
auto buffer = std::make_shared<PoolBuffer>(pool);
RETURN_NOT_OK(buffer->Resize(size));
*out = buffer;
return Status::OK();
}

Status AllocateResizableBuffer(MemoryPool* pool, int64_t size,
Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size,
std::shared_ptr<ResizableBuffer>* out) {
auto buffer = std::make_shared<PoolBuffer>(pool);
RETURN_NOT_OK(buffer->Resize(size));
Expand Down
53 changes: 33 additions & 20 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ARROW_EXPORT Buffer {
/// This method makes no assertions about alignment or padding of the buffer but
/// in general we expected buffers to be aligned and padded to 64 bytes. In the future
/// we might add utility methods to help determine if a buffer satisfies this contract.
Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size)
Buffer(const std::shared_ptr<Buffer>& parent, const int64_t offset, const int64_t size)
: Buffer(parent->data() + offset, size) {
parent_ = parent;
}
Expand All @@ -72,11 +72,12 @@ class ARROW_EXPORT Buffer {
bool Equals(const Buffer& other) const;

/// Copy a section of the buffer into a new Buffer.
Status Copy(int64_t start, int64_t nbytes, MemoryPool* pool,
Status Copy(const int64_t start, const int64_t nbytes, MemoryPool* pool,
std::shared_ptr<Buffer>* out) const;

/// Copy a section of the buffer using the default memory pool into a new Buffer.
Status Copy(int64_t start, int64_t nbytes, std::shared_ptr<Buffer>* out) const;
Status Copy(const int64_t start, const int64_t nbytes,
std::shared_ptr<Buffer>* out) const;

int64_t capacity() const { return capacity_; }
const uint8_t* data() const { return data_; }
Expand Down Expand Up @@ -114,24 +115,27 @@ static inline std::shared_ptr<Buffer> GetBufferFromString(const std::string& str
/// Construct a view on passed buffer at the indicated offset and length. This
/// function cannot fail and does not error checking (except in debug builds)
static inline std::shared_ptr<Buffer> SliceBuffer(const std::shared_ptr<Buffer>& buffer,
int64_t offset, int64_t length) {
const int64_t offset,
const int64_t length) {
return std::make_shared<Buffer>(buffer, offset, length);
}

/// Construct a mutable buffer slice. If the parent buffer is not mutable, this
/// will abort in debug builds
std::shared_ptr<Buffer> ARROW_EXPORT
SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer, int64_t offset, int64_t length);
ARROW_EXPORT
std::shared_ptr<Buffer> SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer,
const int64_t offset, const int64_t length);

/// A Buffer whose contents can be mutated. May or may not own its data.
class ARROW_EXPORT MutableBuffer : public Buffer {
public:
MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) {
MutableBuffer(uint8_t* data, const int64_t size) : Buffer(data, size) {
mutable_data_ = data;
is_mutable_ = true;
}

MutableBuffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);
MutableBuffer(const std::shared_ptr<Buffer>& parent, const int64_t offset,
const int64_t size);

protected:
MutableBuffer() : Buffer(nullptr, 0) {}
Expand All @@ -145,20 +149,20 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
///
/// @param shrink_to_fit On deactivating this option, the capacity of the Buffer won't
/// decrease.
virtual Status Resize(int64_t new_size, bool shrink_to_fit = true) = 0;
virtual Status Resize(const int64_t new_size, bool shrink_to_fit = true) = 0;

/// Ensure that buffer has enough memory allocated to fit the indicated
/// capacity (and meets the 64 byte padding requirement in Layout.md).
/// It does not change buffer's reported size.
virtual Status Reserve(int64_t new_capacity) = 0;
virtual Status Reserve(const int64_t new_capacity) = 0;

template <class T>
Status TypedResize(int64_t new_nb_elements, bool shrink_to_fit = true) {
Status TypedResize(const int64_t new_nb_elements, bool shrink_to_fit = true) {
return Resize(sizeof(T) * new_nb_elements, shrink_to_fit);
}

template <class T>
Status TypedReserve(int64_t new_nb_elements) {
Status TypedReserve(const int64_t new_nb_elements) {
return Reserve(sizeof(T) * new_nb_elements);
}

Expand All @@ -172,8 +176,8 @@ class ARROW_EXPORT PoolBuffer : public ResizableBuffer {
explicit PoolBuffer(MemoryPool* pool = nullptr);
virtual ~PoolBuffer();

Status Resize(int64_t new_size, bool shrink_to_fit = true) override;
Status Reserve(int64_t new_capacity) override;
Status Resize(const int64_t new_size, bool shrink_to_fit = true) override;
Status Reserve(const int64_t new_capacity) override;

private:
MemoryPool* pool_;
Expand All @@ -185,7 +189,7 @@ class ARROW_EXPORT BufferBuilder {
: pool_(pool), data_(nullptr), capacity_(0), size_(0) {}

/// Resizes the buffer to the nearest multiple of 64 bytes per Layout.md
Status Resize(int64_t elements) {
Status Resize(const int64_t elements) {
// Resize(0) is a no-op
if (elements == 0) {
return Status::OK();
Expand Down Expand Up @@ -213,7 +217,7 @@ class ARROW_EXPORT BufferBuilder {
}

// Advance pointer and zero out memory
Status Advance(int64_t length) {
Status Advance(const int64_t length) {
if (capacity_ < length + size_) {
int64_t new_capacity = BitUtil::NextPower2(length + size_);
RETURN_NOT_OK(Resize(new_capacity));
Expand Down Expand Up @@ -299,11 +303,20 @@ class ARROW_EXPORT TypedBufferBuilder : public BufferBuilder {
/// \param[out] out the allocated buffer with padding
///
/// \return Status message
Status ARROW_EXPORT AllocateBuffer(MemoryPool* pool, int64_t size,
std::shared_ptr<MutableBuffer>* out);
ARROW_EXPORT
Status AllocateBuffer(MemoryPool* pool, const int64_t size,
std::shared_ptr<MutableBuffer>* out);

Status ARROW_EXPORT AllocateResizableBuffer(MemoryPool* pool, int64_t size,
std::shared_ptr<ResizableBuffer>* out);
/// Allocate resizeable buffer from a memory pool
///
/// \param[in] pool a memory pool
/// \param[in] size size of buffer to allocate
/// \param[out] out the allocated buffer
///
/// \return Status message
ARROW_EXPORT
Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size,
std::shared_ptr<ResizableBuffer>* out);

} // namespace arrow

Expand Down
29 changes: 6 additions & 23 deletions cpp/src/arrow/ipc/ipc-read-write-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,27 +137,13 @@ static int g_file_number = 0;

class IpcTestFixture : public io::MemoryMapFixture {
public:
Status DoStandardRoundTrip(const RecordBatch& batch, bool zero_data,
Status DoStandardRoundTrip(const RecordBatch& batch,
std::shared_ptr<RecordBatch>* batch_result) {
int32_t metadata_length;
int64_t body_length;

const int64_t buffer_offset = 0;

if (zero_data) {
RETURN_NOT_OK(ZeroMemoryMap(mmap_.get()));
}
RETURN_NOT_OK(mmap_->Seek(0));

RETURN_NOT_OK(WriteRecordBatch(batch, buffer_offset, mmap_.get(), &metadata_length,
&body_length, pool_));
std::shared_ptr<Buffer> serialized_batch;
RETURN_NOT_OK(SerializeRecordBatch(batch, pool_, &serialized_batch));

std::unique_ptr<Message> message;
RETURN_NOT_OK(ReadMessage(0, metadata_length, mmap_.get(), &message));

io::BufferReader buffer_reader(message->body());
return ReadRecordBatch(*message->metadata(), batch.schema(), &buffer_reader,
batch_result);
io::BufferReader buf_reader(serialized_batch);
return ReadRecordBatch(batch.schema(), 0, &buf_reader, batch_result);
}

Status DoLargeRoundTrip(const RecordBatch& batch, bool zero_data,
Expand Down Expand Up @@ -197,7 +183,7 @@ class IpcTestFixture : public io::MemoryMapFixture {
ASSERT_OK(io::MemoryMapFixture::InitMemoryMap(buffer_size, ss.str(), &mmap_));

std::shared_ptr<RecordBatch> result;
ASSERT_OK(DoStandardRoundTrip(batch, true, &result));
ASSERT_OK(DoStandardRoundTrip(batch, &result));
CheckReadResult(*result, batch);

ASSERT_OK(DoLargeRoundTrip(batch, true, &result));
Expand Down Expand Up @@ -657,9 +643,6 @@ TEST_F(TestIpcRoundTrip, LargeRecordBatch) {
CheckReadResult(*result, batch);

ASSERT_EQ(length, result->num_rows());

// Fails if we try to write this with the normal code path
ASSERT_RAISES(Invalid, DoStandardRoundTrip(batch, false, &result));
}

void CheckBatchDictionaries(const RecordBatch& batch) {
Expand Down
11 changes: 6 additions & 5 deletions cpp/src/arrow/ipc/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,7 +703,7 @@ static Status MakeRecordBatch(FBB& fbb, int64_t length, int64_t body_length,
return Status::OK();
}

Status WriteRecordBatchMessage(int64_t length, int64_t body_length,
Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
Expand All @@ -714,7 +714,7 @@ Status WriteRecordBatchMessage(int64_t length, int64_t body_length,
body_length, out);
}

Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset,
std::shared_ptr<Buffer>* out) {
using TensorDimOffset = flatbuffers::Offset<flatbuf::TensorDim>;
using TensorOffset = flatbuffers::Offset<flatbuf::Tensor>;
Expand Down Expand Up @@ -743,7 +743,8 @@ Status WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
body_length, out);
}

Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
Status WriteDictionaryMessage(const int64_t id, const int64_t length,
const int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out) {
Expand Down Expand Up @@ -1106,8 +1107,8 @@ static Status ReadFullMessage(const std::shared_ptr<Buffer>& metadata,
return Message::Open(metadata, body, message);
}

Status ReadMessage(int64_t offset, int32_t metadata_length, io::RandomAccessFile* file,
std::unique_ptr<Message>* message) {
Status ReadMessage(const int64_t offset, const int32_t metadata_length,
io::RandomAccessFile* file, std::unique_ptr<Message>* message) {
std::shared_ptr<Buffer> buffer;
RETURN_NOT_OK(file->ReadAt(offset, metadata_length, &buffer));

Expand Down
23 changes: 13 additions & 10 deletions cpp/src/arrow/ipc/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ class ARROW_EXPORT InputStreamMessageReader : public MessageReader {
/// \param[in] file the seekable file interface to read from
/// \param[out] message the message read
/// \return Status success or failure
Status ARROW_EXPORT ReadMessage(int64_t offset, int32_t metadata_length,
io::RandomAccessFile* file,
std::unique_ptr<Message>* message);
ARROW_EXPORT
Status ReadMessage(const int64_t offset, const int32_t metadata_length,
io::RandomAccessFile* file, std::unique_ptr<Message>* message);

/// \brief Read encapulated RPC message (metadata and body) from InputStream
///
Expand Down Expand Up @@ -274,15 +274,18 @@ Status ARROW_EXPORT WriteSchemaMessage(const Schema& schema,
DictionaryMemo* dictionary_memo,
std::shared_ptr<Buffer>* out);

Status ARROW_EXPORT WriteRecordBatchMessage(int64_t length, int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
ARROW_EXPORT
Status WriteRecordBatchMessage(const int64_t length, const int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);

Status ARROW_EXPORT WriteTensorMessage(const Tensor& tensor, int64_t buffer_start_offset,
std::shared_ptr<Buffer>* out);
ARROW_EXPORT
Status WriteTensorMessage(const Tensor& tensor, const int64_t buffer_start_offset,
std::shared_ptr<Buffer>* out);

Status WriteDictionaryMessage(int64_t id, int64_t length, int64_t body_length,
Status WriteDictionaryMessage(const int64_t id, const int64_t length,
const int64_t body_length,
const std::vector<FieldMetadata>& nodes,
const std::vector<BufferMetadata>& buffers,
std::shared_ptr<Buffer>* out);
Expand Down
Loading

0 comments on commit 2952cfb

Please sign in to comment.