diff --git a/cpp/src/arrow/gpu/CMakeLists.txt b/cpp/src/arrow/gpu/CMakeLists.txt index 7cbaa76c41503..2db038f5814b7 100644 --- a/cpp/src/arrow/gpu/CMakeLists.txt +++ b/cpp/src/arrow/gpu/CMakeLists.txt @@ -27,6 +27,7 @@ find_package(CUDA REQUIRED) include_directories(SYSTEM ${CUDA_INCLUDE_DIRS}) set(ARROW_GPU_SRCS + cuda_arrow_ipc.cc cuda_context.cc cuda_memory.cc ) @@ -46,6 +47,7 @@ ADD_ARROW_LIB(arrow_gpu install(FILES cuda_api.h + cuda_arrow_ipc.h cuda_context.h cuda_memory.h DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/arrow/gpu") diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc index 1aa4199d1b2a3..3fe5fba7f47f7 100644 --- a/cpp/src/arrow/gpu/cuda-test.cc +++ b/cpp/src/arrow/gpu/cuda-test.cc @@ -77,6 +77,43 @@ TEST_F(TestCudaBuffer, CopyFromHost) { AssertCudaBufferEquals(*device_buffer, host_buffer->data(), kSize); } +// IPC only supported on Linux +#if defined(__linux) + +TEST_F(TestCudaBuffer, DISABLED_ExportForIpc) { + // For this test to work, a second process needs to be spawned + const int64_t kSize = 1000; + std::shared_ptr device_buffer; + ASSERT_OK(context_->Allocate(kSize, &device_buffer)); + + std::shared_ptr host_buffer; + ASSERT_OK(test::MakeRandomBytePoolBuffer(kSize, default_memory_pool(), &host_buffer)); + ASSERT_OK(device_buffer->CopyFromHost(0, host_buffer->data(), kSize)); + + // Export for IPC and serialize + std::unique_ptr ipc_handle; + ASSERT_OK(device_buffer->ExportForIpc(&ipc_handle)); + + std::shared_ptr serialized_handle; + ASSERT_OK(ipc_handle->Serialize(default_memory_pool(), &serialized_handle)); + + // Deserialize IPC handle and open + std::unique_ptr ipc_handle2; + ASSERT_OK(CudaIpcMemHandle::FromBuffer(serialized_handle->data(), &ipc_handle2)); + + std::shared_ptr ipc_buffer; + ASSERT_OK(context_->OpenIpcBuffer(*ipc_handle2, &ipc_buffer)); + + ASSERT_EQ(kSize, ipc_buffer->size()); + + std::shared_ptr ipc_data; + ASSERT_OK(AllocateBuffer(default_memory_pool(), kSize, &ipc_data)); + ASSERT_OK(ipc_buffer->CopyToHost(0, kSize, ipc_data->mutable_data())); + ASSERT_EQ(0, std::memcmp(ipc_buffer->data(), host_buffer->data(), kSize)); +} + +#endif + class TestCudaBufferWriter : public TestCudaBufferBase { public: void SetUp() { TestCudaBufferBase::SetUp(); } diff --git a/cpp/src/arrow/gpu/cuda_api.h b/cpp/src/arrow/gpu/cuda_api.h index b9f2ba3b752fd..b05f00f5835c5 100644 --- a/cpp/src/arrow/gpu/cuda_api.h +++ b/cpp/src/arrow/gpu/cuda_api.h @@ -19,6 +19,7 @@ #ifndef ARROW_GPU_CUDA_API_H #define ARROW_GPU_CUDA_API_H +#include "arrow/gpu/cuda_arrow_ipc.h" #include "arrow/gpu/cuda_context.h" #include "arrow/gpu/cuda_memory.h" #include "arrow/gpu/cuda_version.h" diff --git a/cpp/src/arrow/gpu/cuda_ipc.h b/cpp/src/arrow/gpu/cuda_arrow_ipc.h similarity index 61% rename from cpp/src/arrow/gpu/cuda_ipc.h rename to cpp/src/arrow/gpu/cuda_arrow_ipc.h index ccdc13eb3795f..37d3c948b2812 100644 --- a/cpp/src/arrow/gpu/cuda_ipc.h +++ b/cpp/src/arrow/gpu/cuda_arrow_ipc.h @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -#ifndef ARROW_GPU_CUDA_MEMORY_H -#define ARROW_GPU_CUDA_MEMORY_H +#ifndef ARROW_GPU_CUDA_ARROW_IPC_H +#define ARROW_GPU_CUDA_ARROW_IPC_H #include #include @@ -25,31 +25,25 @@ #include "arrow/status.h" #include "arrow/util/visibility.h" -#include "arrow/cuda_memory.h" - namespace arrow { + +class RecordBatch; + namespace gpu { -/// \brief Write record batch message to GPU device memory -/// -/// -ARROW_EXPORT -SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, - std::shared_ptr* out); +class CudaBuffer; +class CudaContext; -/// \brief Write record batch to pre-allocated GPU device memory -/// -/// \param[in] batch the record batch to write -/// \param[in] out the CudaBufferWriter to write the output to +/// \brief Write record batch message to GPU device memory +/// \param[in] batch record batch to write +/// \param[in] ctx CudaContext to allocate device memory from +/// \param[out] out the returned device buffer which contains the record batch message /// \return Status -/// -/// The CudaBufferWriter must have enough pre-allocated space to accommodate -/// the record batch. You can use arrow::ipc::GetRecordBatchSize to compute -/// this ARROW_EXPORT -SerializeRecordBatch(const RecordBatch& batch, CudaBufferWriter* out); +Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx, + std::shared_ptr* out); } // namespace gpu } // namespace arrow -#endif // ARROW_GPU_CUDA_MEMORY_H +#endif // ARROW_GPU_CUDA_ARROW_IPC_H diff --git a/cpp/src/arrow/gpu/cuda_common.h b/cpp/src/arrow/gpu/cuda_common.h index 5a949a5880677..c06c1a21ff481 100644 --- a/cpp/src/arrow/gpu/cuda_common.h +++ b/cpp/src/arrow/gpu/cuda_common.h @@ -40,7 +40,7 @@ namespace gpu { if (ret != CUDA_SUCCESS) { \ std::stringstream ss; \ ss << "Cuda Driver API call in " << __FILE__ << " at line " << __LINE__ \ - << " failed: " << #STMT; \ + << " failed with code " << ret << ": " << #STMT; \ return Status::IOError(ss.str()); \ } \ } while (0) diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc index feabfc5252a0e..430ecab6fbe07 100644 --- a/cpp/src/arrow/gpu/cuda_context.cc +++ b/cpp/src/arrow/gpu/cuda_context.cc @@ -88,6 +88,14 @@ class CudaContext::CudaContextImpl { return Status::OK(); } + Status ExportIpcBuffer(uint8_t* data, std::unique_ptr* handle) { + CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); + CUipcMemHandle cu_handle; + CU_RETURN_NOT_OK(cuIpcGetMemHandle(&cu_handle, reinterpret_cast(data))); + *handle = std::unique_ptr(new CudaIpcMemHandle(&cu_handle)); + return Status::OK(); + } + Status OpenIpcBuffer(const CudaIpcMemHandle& ipc_handle, uint8_t** out) { CU_RETURN_NOT_OK(cuCtxSetCurrent(context_)); auto handle = reinterpret_cast(ipc_handle.handle()); @@ -151,12 +159,17 @@ class CudaDeviceManager::CudaDeviceManagerImpl { return Status::OK(); } + Status CreateNewContext(int device_number, std::shared_ptr* out) { + *out = std::shared_ptr(new CudaContext()); + return (*out)->impl_->Init(devices_[device_number]); + } + Status GetContext(int device_number, std::shared_ptr* out) { auto it = contexts_.find(device_number); if (it == contexts_.end()) { - auto ctx = std::shared_ptr(new CudaContext()); - RETURN_NOT_OK(ctx->impl_->Init(devices_[device_number])); - contexts_[device_number] = *out = ctx; + std::shared_ptr new_context; + RETURN_NOT_OK(CreateNewContext(device_number, &new_context)); + contexts_[device_number] = *out = new_context; } else { *out = it->second; } @@ -193,6 +206,11 @@ Status CudaDeviceManager::GetContext(int device_number, return impl_->GetContext(device_number, out); } +Status CudaDeviceManager::CreateNewContext(int device_number, + std::shared_ptr* out) { + return impl_->CreateNewContext(device_number, out); +} + Status CudaDeviceManager::AllocateHost(int64_t nbytes, std::shared_ptr* out) { uint8_t* data = nullptr; @@ -221,6 +239,11 @@ Status CudaContext::Allocate(int64_t nbytes, std::shared_ptr* out) { return Status::OK(); } +Status CudaContext::ExportIpcBuffer(uint8_t* data, + std::unique_ptr* handle) { + return impl_->ExportIpcBuffer(data, handle); +} + Status CudaContext::CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes) { return impl_->CopyHostToDevice(dst, src, nbytes); } @@ -229,6 +252,8 @@ Status CudaContext::CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t n return impl_->CopyDeviceToHost(dst, src, nbytes); } +Status CudaContext::Close() { return impl_->Close(); } + Status CudaContext::Free(uint8_t* device_ptr, int64_t nbytes) { return impl_->Free(device_ptr, nbytes); } diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h index c433976608cb1..6471059612349 100644 --- a/cpp/src/arrow/gpu/cuda_context.h +++ b/cpp/src/arrow/gpu/cuda_context.h @@ -36,9 +36,14 @@ class ARROW_EXPORT CudaDeviceManager { public: static Status GetInstance(CudaDeviceManager** manager); - /// \brief Get the CUDA driver context for a particular device + /// \brief Get the shared CUDA driver context for a particular device Status GetContext(int gpu_number, std::shared_ptr* ctx); + /// \brief Create a new context for a given device number + /// + /// In general code will use GetContext + Status CreateNewContext(int gpu_number, std::shared_ptr* ctx); + Status AllocateHost(int64_t nbytes, std::shared_ptr* buffer); Status FreeHost(uint8_t* data, int64_t nbytes); @@ -63,7 +68,7 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this* handle); Status CopyHostToDevice(uint8_t* dst, const uint8_t* src, int64_t nbytes); Status CopyDeviceToHost(uint8_t* dst, const uint8_t* src, int64_t nbytes); Status Free(uint8_t* device_ptr, int64_t nbytes); diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index 7c6464a7bf48f..3c88fe2d59fbc 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -114,11 +114,8 @@ Status CudaBuffer::ExportForIpc(std::unique_ptr* handle) { if (is_ipc_) { return Status::Invalid("Buffer has already been exported for IPC"); } - CUipcMemHandle cu_handle; - CU_RETURN_NOT_OK( - cuIpcGetMemHandle(&cu_handle, reinterpret_cast(mutable_data_))); - is_ipc_ = true; - *handle = std::unique_ptr(new CudaIpcMemHandle(&cu_handle)); + RETURN_NOT_OK(context_->ExportIpcBuffer(mutable_data_, handle)); + own_data_ = false; return Status::OK(); } diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index b9003540a61fb..d5407371f35f5 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -103,15 +103,16 @@ class ARROW_EXPORT CudaIpcMemHandle { /// \return Status Status Serialize(MemoryPool* pool, std::shared_ptr* out) const; - const void* handle() const; - private: explicit CudaIpcMemHandle(const void* handle); struct CudaIpcMemHandleImpl; std::unique_ptr impl_; + const void* handle() const; + friend CudaBuffer; + friend CudaContext; }; /// \class CudaBufferReader diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc index 9c05cba918d83..e17b974adfcad 100644 --- a/cpp/src/arrow/ipc/writer.cc +++ b/cpp/src/arrow/ipc/writer.cc @@ -901,14 +901,19 @@ Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, RETURN_NOT_OK(AllocateBuffer(pool, size, &buffer)); io::FixedSizeBufferWriter stream(buffer); - int32_t metadata_length = 0; - int64_t body_length = 0; - RETURN_NOT_OK(WriteRecordBatch(batch, 0, &stream, &metadata_length, &body_length, pool, - kMaxNestingDepth, true)); + RETURN_NOT_OK(SerializeRecordBatch(batch, pool, &stream)); *out = buffer; return Status::OK(); } +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + io::OutputStream* out) { + int32_t metadata_length = 0; + int64_t body_length = 0; + return WriteRecordBatch(batch, 0, out, &metadata_length, &body_length, pool, + kMaxNestingDepth, true); +} + Status SerializeSchema(const Schema& schema, MemoryPool* pool, std::shared_ptr* out) { std::shared_ptr stream; diff --git a/cpp/src/arrow/ipc/writer.h b/cpp/src/arrow/ipc/writer.h index d867982d2be02..3f110fe26fbdc 100644 --- a/cpp/src/arrow/ipc/writer.h +++ b/cpp/src/arrow/ipc/writer.h @@ -177,6 +177,18 @@ ARROW_EXPORT Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, std::shared_ptr* out); +/// \brief Write record batch to OutputStream +/// +/// \param[in] batch the record batch to write +/// \param[in] out the OutputStream to write the output to +/// \return Status +/// +/// If writing to pre-allocated memory, you can use +/// arrow::ipc::GetRecordBatchSize to compute how much space is required +ARROW_EXPORT +Status SerializeRecordBatch(const RecordBatch& batch, MemoryPool* pool, + io::OutputStream* out); + /// \brief Serialize schema using stream writer as a sequence of one or more /// IPC messages ///