Skip to content

Commit

Permalink
ARROW-8429: [C++] Implement missing checks in IPC MessageDecoder
Browse files Browse the repository at this point in the history
Also allow taking 0-sized slices in Buffer::CopySlice.

Closes #6917 from pitrou/ARROW-8429-ipc-decoder-checks

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
pitrou authored and wesm committed Apr 13, 2020
1 parent a5ee89a commit 1712410
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 33 deletions.
3 changes: 2 additions & 1 deletion cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ Result<std::shared_ptr<Buffer>> Buffer::CopySlice(const int64_t start,
const int64_t nbytes,
MemoryPool* pool) const {
// Sanity checks
ARROW_CHECK_LT(start, size_);
ARROW_CHECK_LE(start, size_);
ARROW_CHECK_LE(nbytes, size_ - start);
DCHECK_GE(nbytes, 0);

ARROW_ASSIGN_OR_RAISE(auto new_buffer, AllocateResizableBuffer(nbytes, pool));
std::memcpy(new_buffer->mutable_data(), data() + start, static_cast<size_t>(nbytes));
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/arrow/buffer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,18 @@ TEST(TestBuffer, CopySlice) {
ASSERT_EQ(0, memcmp(out->data() + out->size(), zeros.data(), zeros.size()));
}

TEST(TestBuffer, CopySliceEmpty) {
auto buf = std::make_shared<Buffer>("");
ASSERT_OK_AND_ASSIGN(auto out, buf->CopySlice(0, 0));
AssertBufferEqual(*out, "");

buf = std::make_shared<Buffer>("1234");
ASSERT_OK_AND_ASSIGN(out, buf->CopySlice(0, 0));
AssertBufferEqual(*out, "");
ASSERT_OK_AND_ASSIGN(out, buf->CopySlice(4, 0));
AssertBufferEqual(*out, "");
}

TEST(TestBuffer, ToHexString) {
const uint8_t data_array[] = "\a0hex string\xa9";
std::basic_string<uint8_t> data_str = data_array;
Expand Down
67 changes: 36 additions & 31 deletions cpp/src/arrow/ipc/message.cc
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ Status CheckMetadataAndGetBodyLength(const Buffer& metadata, int64_t* body_lengt
const flatbuf::Message* fb_message;
RETURN_NOT_OK(internal::VerifyMessage(metadata.data(), metadata.size(), &fb_message));
*body_length = fb_message->bodyLength();
if (*body_length < 0) {
return Status::IOError("Invalid IPC message: negative bodyLength");
}
return Status::OK();
}

Expand Down Expand Up @@ -471,11 +474,11 @@ class MessageDecoder::MessageDecoderImpl {
break;
case State::METADATA: {
auto buffer = std::make_shared<Buffer>(data, next_required_size_);
RETURN_NOT_OK(ConsumeMetadataBuffer(&buffer));
RETURN_NOT_OK(ConsumeMetadataBuffer(buffer));
} break;
case State::BODY: {
auto buffer = std::make_shared<Buffer>(data, next_required_size_);
RETURN_NOT_OK(ConsumeBodyBuffer(&buffer));
RETURN_NOT_OK(ConsumeBodyBuffer(buffer));
} break;
case State::EOS:
return Status::OK();
Expand All @@ -494,9 +497,9 @@ class MessageDecoder::MessageDecoderImpl {
return ConsumeChunks();
}

Status ConsumeBuffer(std::shared_ptr<Buffer>* buffer) {
Status ConsumeBuffer(std::shared_ptr<Buffer> buffer) {
if (buffered_size_ == 0) {
while ((*buffer)->size() >= next_required_size_) {
while (buffer->size() >= next_required_size_) {
auto used_size = next_required_size_;
switch (state_) {
case State::INITIAL:
Expand All @@ -506,37 +509,37 @@ class MessageDecoder::MessageDecoderImpl {
RETURN_NOT_OK(ConsumeMetadataLengthBuffer(buffer));
break;
case State::METADATA:
if ((*buffer)->size() == next_required_size_) {
if (buffer->size() == next_required_size_) {
return ConsumeMetadataBuffer(buffer);
} else {
auto sliced_buffer = SliceBuffer(*buffer, 0, next_required_size_);
RETURN_NOT_OK(ConsumeMetadataBuffer(&sliced_buffer));
auto sliced_buffer = SliceBuffer(buffer, 0, next_required_size_);
RETURN_NOT_OK(ConsumeMetadataBuffer(sliced_buffer));
}
break;
case State::BODY:
if ((*buffer)->size() == next_required_size_) {
if (buffer->size() == next_required_size_) {
return ConsumeBodyBuffer(buffer);
} else {
auto sliced_buffer = SliceBuffer(*buffer, 0, next_required_size_);
RETURN_NOT_OK(ConsumeBodyBuffer(&sliced_buffer));
auto sliced_buffer = SliceBuffer(buffer, 0, next_required_size_);
RETURN_NOT_OK(ConsumeBodyBuffer(sliced_buffer));
}
break;
case State::EOS:
return Status::OK();
}
if ((*buffer)->size() == used_size) {
if (buffer->size() == used_size) {
return Status::OK();
}
*buffer = SliceBuffer(*buffer, used_size);
buffer = SliceBuffer(buffer, used_size);
}
}

if ((*buffer)->size() == 0) {
if (buffer->size() == 0) {
return Status::OK();
}

buffered_size_ += (*buffer)->size();
chunks_.push_back(std::move(*buffer));
buffered_size_ += buffer->size();
chunks_.push_back(std::move(buffer));
return ConsumeChunks();
}

Expand Down Expand Up @@ -576,7 +579,7 @@ class MessageDecoder::MessageDecoderImpl {
return ConsumeInitial(util::SafeLoadAs<int32_t>(data));
}

Status ConsumeInitialBuffer(std::shared_ptr<Buffer>* buffer) {
Status ConsumeInitialBuffer(const std::shared_ptr<Buffer>& buffer) {
ARROW_ASSIGN_OR_RAISE(auto continuation, ConsumeDataBufferInt32(buffer));
return ConsumeInitial(continuation);
}
Expand All @@ -599,21 +602,23 @@ class MessageDecoder::MessageDecoderImpl {
next_required_size_ = 0;
RETURN_NOT_OK(listener_->OnEOS());
return Status::OK();
} else {
} else if (continuation > 0) {
state_ = State::METADATA;
// ARROW-6314: Backwards compatibility for reading old IPC
// messages produced prior to version 0.15.0
next_required_size_ = continuation;
RETURN_NOT_OK(listener_->OnMetadata());
return Status::OK();
} else {
return Status::IOError("Invalid IPC stream: negative continuation token");
}
}

Status ConsumeMetadataLengthData(const uint8_t* data, int64_t size) {
return ConsumeMetadataLength(util::SafeLoadAs<int32_t>(data));
}

Status ConsumeMetadataLengthBuffer(std::shared_ptr<Buffer>* buffer) {
Status ConsumeMetadataLengthBuffer(const std::shared_ptr<Buffer>& buffer) {
ARROW_ASSIGN_OR_RAISE(auto metadata_length, ConsumeDataBufferInt32(buffer));
return ConsumeMetadataLength(metadata_length);
}
Expand All @@ -638,12 +643,12 @@ class MessageDecoder::MessageDecoderImpl {
}
}

Status ConsumeMetadataBuffer(std::shared_ptr<Buffer>* buffer) {
if ((*buffer)->is_cpu()) {
metadata_ = std::move(*buffer);
Status ConsumeMetadataBuffer(const std::shared_ptr<Buffer>& buffer) {
if (buffer->is_cpu()) {
metadata_ = buffer;
} else {
ARROW_ASSIGN_OR_RAISE(
metadata_, Buffer::ViewOrCopy(*buffer, CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(metadata_,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
}
return ConsumeMetadata();
}
Expand Down Expand Up @@ -693,8 +698,8 @@ class MessageDecoder::MessageDecoderImpl {
}
}

Status ConsumeBodyBuffer(std::shared_ptr<Buffer>* buffer) {
return ConsumeBody(buffer);
Status ConsumeBodyBuffer(std::shared_ptr<Buffer> buffer) {
return ConsumeBody(&buffer);
}

Status ConsumeBodyChunks() {
Expand Down Expand Up @@ -729,12 +734,12 @@ class MessageDecoder::MessageDecoderImpl {
return Status::OK();
}

Result<int32_t> ConsumeDataBufferInt32(std::shared_ptr<Buffer>* buffer) {
if ((*buffer)->is_cpu()) {
return util::SafeLoadAs<int32_t>((*buffer)->data());
Result<int32_t> ConsumeDataBufferInt32(const std::shared_ptr<Buffer>& buffer) {
if (buffer->is_cpu()) {
return util::SafeLoadAs<int32_t>(buffer->data());
} else {
ARROW_ASSIGN_OR_RAISE(
auto cpu_buffer, Buffer::ViewOrCopy(*buffer, CPUDevice::memory_manager(pool_)));
ARROW_ASSIGN_OR_RAISE(auto cpu_buffer,
Buffer::ViewOrCopy(buffer, CPUDevice::memory_manager(pool_)));
return util::SafeLoadAs<int32_t>(cpu_buffer->data());
}
}
Expand Down Expand Up @@ -800,7 +805,7 @@ Status MessageDecoder::Consume(const uint8_t* data, int64_t size) {
}

Status MessageDecoder::Consume(std::shared_ptr<Buffer> buffer) {
return impl_->ConsumeBuffer(&buffer);
return impl_->ConsumeBuffer(buffer);
}

int64_t MessageDecoder::next_required_size() const { return impl_->next_required_size(); }
Expand Down
2 changes: 1 addition & 1 deletion testing

0 comments on commit 1712410

Please sign in to comment.