diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 3fe5fba7f47f7..9045e2fa4277e 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -22,6 +22,8 @@ #include "gtest/gtest.h" #include "arrow/status.h" +#include "arrow/ipc/test-common.h" +#include "arrow/ipc/api.h" #include "arrow/test-util.h" #include "arrow/gpu/cuda_api.h" @@ -262,5 +264,41 @@ TEST_F(TestCudaBufferReader, Basics) { ASSERT_EQ(0, std::memcmp(stack_buffer, host_data + 925, 75)); } +class TestCudaArrowIpc : public TestCudaBufferBase { + public: + void SetUp() { + TestCudaBufferBase::SetUp(); + pool_ = default_memory_pool(); + } + + protected: + MemoryPool* pool_; +}; + +TEST_F(TestCudaArrowIpc, BasicWriteRead) { + std::shared_ptr batch; + ASSERT_OK(ipc::MakeIntRecordBatch(&batch)); + + std::shared_ptr device_serialized; + ASSERT_OK(arrow::gpu::SerializeRecordBatch(*batch, context_.get(), + &device_serialized)); + + // Test that ReadRecordBatch works properly + std::shared_ptr device_batch; + ASSERT_OK(ReadRecordBatch(batch->schema(), device_serialized, &device_batch)); + + // Copy data from device, read batch, and compare + std::shared_ptr host_buffer; + int64_t size = device_serialized->size(); + ASSERT_OK(AllocateBuffer(pool_, size, &host_buffer)); + ASSERT_OK(device_serialized->CopyToHost(0, size, host_buffer->mutable_data())); + + std::shared_ptr cpu_batch; + io::BufferReader cpu_reader(host_buffer); + ASSERT_OK(ipc::ReadRecordBatch(batch->schema(), &cpu_reader, &cpu_batch)); + + ipc::CompareBatch(*batch, *cpu_batch); +} + } // namespace gpu } // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.cc b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc new file mode 100644 index 0000000000000..42f6d16d19e1e --- /dev/null +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.cc @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/ipc/message.h" +#include "arrow/ipc/writer.h" +#include "arrow/status.h" +#include "arrow/table.h" +#include "arrow/util/visibility.h" + +#include "arrow/gpu/cuda_context.h" +#include "arrow/gpu/cuda_memory.h" + +namespace arrow { +namespace gpu { + +Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out) { + int64_t size = 0; + RETURN_NOT_OK(ipc::GetRecordBatchSize(batch, &size)); + + std::shared_ptr buffer; + RETURN_NOT_OK(ctx->Allocate(size, &buffer)); + + CudaBufferWriter stream(buffer); + + // Use 8MB buffering, which yields generally good performance + RETURN_NOT_OK(stream.SetBufferSize(1 << 23)); + + // We use the default memory pool here since any allocations are ephemeral + RETURN_NOT_OK(ipc::SerializeRecordBatch(batch, default_memory_pool(), + &stream)); + *out = buffer; + return Status::OK(); +} + +Status ReadMessage(CudaBufferReader* stream, MemoryPool* pool, + std::unique_ptr* message) { + uint8_t length_buf[4] = {0}; + + int64_t bytes_read = 0; + RETURN_NOT_OK(file->Read(sizeof(int32_t), &bytes_read, length_buf)); + if (bytes_read != sizeof(int32_t)) { + *message = nullptr; + return Status::OK(); + } + + const int32_t metadata_length = *reinterpret_cast(length_buf); + + if (metadata_length == 0) { + // Optional 0 EOS control message + *message = nullptr; + return Status::OK(); + } + + std::shared_ptr metadata; + RETURN_NOT_OK(AllocateBuffer(pool, metadata_length, &metadata)); + RETURN_NOT_OK(file->Read(message_length, &bytes_read, metadata->mutable_data())); + if (bytes_read != metadata_length) { + return Status::IOError("Unexpected end of stream trying to read message"); + } + + auto fb_message = flatbuf::GetMessage(metadata->data()); + + int64_t body_length = fb_message->bodyLength(); + + // Zero copy + std::shared_ptr body; + RETURN_NOT_OK(stream->Read(body_length, &body)); + if (body->size() < body_length) { + std::stringstream ss; + ss << "Expected to be able to read " << body_length << " bytes for message body, got " + << body->size(); + return Status::IOError(ss.str()); + } + + return Message::Open(metadata, body, message); +} + +} // namespace gpu +} // namespace arrow diff --git a/cpp/src/arrow/gpu/cuda_arrow_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h index 37d3c948b2812..f86cff51889c3 100644 --- a/cpp/src/arrow/gpu/cuda_arrow_ipc.h +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h @@ -43,6 +43,26 @@ ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, std::shared_ptr* out); +/// \brief Read Arrow IPC message located on GPU device +/// \param[in] stream a CudaBufferReader +/// \param[in] pool a MemoryPool to allocate CPU memory for the metadata +/// \param[out] message the deserialized message, body still on device +/// +/// This function reads the message metadata into host memory, but leaves the +/// message body on the device +ARROW_EXPORT +Status ReadMessage(io::CudaBufferReader* stream, MemoryPool* pool, + std::unique_ptr* message); + +/// \brief ReadRecordBatch specialized to handle metadata on CUDA device +/// \param[in] schema +/// \param[in] buffer +/// \param[out] out the reconstructed RecordBatch, with device pointers +ARROW_EXPORT +Status ReadRecordBatch(const std::shared_ptr& schema + const std::shared_ptr& buffer, + std::shared_ptr* out); + } // namespace gpu } // namespace arrow