Skip to content

Commit

Permalink
ARROW-7330: [C++] Migrate Arrow Cuda to Result<T>
Browse files Browse the repository at this point in the history
Closes #6418 from pitrou/ARROW-7330-cuda-result and squashes the following commits:

86b5f7f <Sutou Kouhei>  Follow API change
5eb866f <Antoine Pitrou> ARROW-7330:  Migrate Arrow Cuda to Result<T>

Lead-authored-by: Antoine Pitrou <antoine@python.org>
Co-authored-by: Sutou Kouhei <kou@clear-code.com>
Signed-off-by: Sutou Kouhei <kou@clear-code.com>
  • Loading branch information
pitrou and kou committed Feb 15, 2020
1 parent 0e12a5a commit 5411915
Show file tree
Hide file tree
Showing 17 changed files with 446 additions and 270 deletions.
97 changes: 39 additions & 58 deletions c_glib/arrow-cuda-glib/cuda.cpp
Expand Up @@ -99,9 +99,8 @@ garrow_cuda_device_manager_class_init(GArrowCUDADeviceManagerClass *klass)
GArrowCUDADeviceManager *
garrow_cuda_device_manager_new(GError **error)
{
arrow::cuda::CudaDeviceManager *manager;
auto status = arrow::cuda::CudaDeviceManager::GetInstance(&manager);
if (garrow_error_check(error, status, "[cuda][device-manager][new]")) {
auto arrow_manager = arrow::cuda::CudaDeviceManager::Instance();
if (garrow::check(error, arrow_manager, "[cuda][device-manager][new]")) {
auto manager = g_object_new(GARROW_CUDA_TYPE_DEVICE_MANAGER,
NULL);
return GARROW_CUDA_DEVICE_MANAGER(manager);
Expand All @@ -127,13 +126,11 @@ garrow_cuda_device_manager_get_context(GArrowCUDADeviceManager *manager,
gint gpu_number,
GError **error)
{
arrow::cuda::CudaDeviceManager *arrow_manager;
arrow::cuda::CudaDeviceManager::GetInstance(&arrow_manager);
std::shared_ptr<arrow::cuda::CudaContext> context;
auto status = arrow_manager->GetContext(gpu_number, &context);
if (garrow_error_check(error, status,
"[cuda][device-manager][get-context]]")) {
return garrow_cuda_context_new_raw(&context);
auto arrow_manager = arrow::cuda::CudaDeviceManager::Instance();
auto arrow_cuda_context = (*arrow_manager)->GetContext(gpu_number);
if (garrow::check(error, arrow_cuda_context,
"[cuda][device-manager][get-context]]")) {
return garrow_cuda_context_new_raw(&(*arrow_cuda_context));
} else {
return NULL;
}
Expand All @@ -150,9 +147,8 @@ garrow_cuda_device_manager_get_context(GArrowCUDADeviceManager *manager,
gsize
garrow_cuda_device_manager_get_n_devices(GArrowCUDADeviceManager *manager)
{
arrow::cuda::CudaDeviceManager *arrow_manager;
arrow::cuda::CudaDeviceManager::GetInstance(&arrow_manager);
return arrow_manager->num_devices();
auto arrow_manager = arrow::cuda::CudaDeviceManager::Instance();
return (*arrow_manager)->num_devices();
}


Expand Down Expand Up @@ -291,10 +287,9 @@ garrow_cuda_buffer_new(GArrowCUDAContext *context,
GError **error)
{
auto arrow_context = garrow_cuda_context_get_raw(context);
std::shared_ptr<arrow::cuda::CudaBuffer> arrow_buffer;
auto status = arrow_context->Allocate(size, &arrow_buffer);
if (garrow_error_check(error, status, "[cuda][buffer][new]")) {
return garrow_cuda_buffer_new_raw(&arrow_buffer);
auto arrow_buffer = arrow_context->Allocate(size);
if (garrow::check(error, arrow_buffer, "[cuda][buffer][new]")) {
return garrow_cuda_buffer_new_raw(&(*arrow_buffer));
} else {
return NULL;
}
Expand All @@ -318,11 +313,9 @@ garrow_cuda_buffer_new_ipc(GArrowCUDAContext *context,
{
auto arrow_context = garrow_cuda_context_get_raw(context);
auto arrow_handle = garrow_cuda_ipc_memory_handle_get_raw(handle);
std::shared_ptr<arrow::cuda::CudaBuffer> arrow_buffer;
auto status = arrow_context->OpenIpcBuffer(*arrow_handle, &arrow_buffer);
if (garrow_error_check(error, status,
"[cuda][buffer][new-ipc]")) {
return garrow_cuda_buffer_new_raw(&arrow_buffer);
auto arrow_buffer = arrow_context->OpenIpcBuffer(*arrow_handle);
if (garrow::check(error, arrow_buffer, "[cuda][buffer][new-ipc]")) {
return garrow_cuda_buffer_new_raw(&(*arrow_buffer));
} else {
return NULL;
}
Expand All @@ -347,13 +340,10 @@ garrow_cuda_buffer_new_record_batch(GArrowCUDAContext *context,
{
auto arrow_context = garrow_cuda_context_get_raw(context);
auto arrow_record_batch = garrow_record_batch_get_raw(record_batch);
std::shared_ptr<arrow::cuda::CudaBuffer> arrow_buffer;
auto status = arrow::cuda::SerializeRecordBatch(*arrow_record_batch,
arrow_context.get(),
&arrow_buffer);
if (garrow_error_check(error, status,
"[cuda][buffer][new-record-batch]")) {
return garrow_cuda_buffer_new_raw(&arrow_buffer);
auto arrow_buffer = arrow::cuda::SerializeRecordBatch(*arrow_record_batch,
arrow_context.get());
if (garrow::check(error, arrow_buffer, "[cuda][buffer][new-record-batch]")) {
return garrow_cuda_buffer_new_raw(&(*arrow_buffer));
} else {
return NULL;
}
Expand Down Expand Up @@ -427,10 +417,9 @@ GArrowCUDAIPCMemoryHandle *
garrow_cuda_buffer_export(GArrowCUDABuffer *buffer, GError **error)
{
auto arrow_buffer = garrow_cuda_buffer_get_raw(buffer);
std::shared_ptr<arrow::cuda::CudaIpcMemHandle> arrow_handle;
auto status = arrow_buffer->ExportForIpc(&arrow_handle);
if (garrow_error_check(error, status, "[cuda][buffer][export-for-ipc]")) {
return garrow_cuda_ipc_memory_handle_new_raw(&arrow_handle);
auto arrow_handle = arrow_buffer->ExportForIpc();
if (garrow::check(error, arrow_handle, "[cuda][buffer][export-for-ipc]")) {
return garrow_cuda_ipc_memory_handle_new_raw(&(*arrow_handle));
} else {
return NULL;
}
Expand Down Expand Up @@ -472,14 +461,12 @@ garrow_cuda_buffer_read_record_batch(GArrowCUDABuffer *buffer,
auto arrow_buffer = garrow_cuda_buffer_get_raw(buffer);
auto arrow_schema = garrow_schema_get_raw(schema);
auto pool = arrow::default_memory_pool();
std::shared_ptr<arrow::RecordBatch> arrow_record_batch;
auto status = arrow::cuda::ReadRecordBatch(arrow_schema,
arrow_buffer,
pool,
&arrow_record_batch);
if (garrow_error_check(error, status,
"[cuda][buffer][read-record-batch]")) {
return garrow_record_batch_new_raw(&arrow_record_batch);
auto arrow_record_batch = arrow::cuda::ReadRecordBatch(arrow_schema,
arrow_buffer,
pool);
if (garrow::check(error, arrow_record_batch,
"[cuda][buffer][read-record-batch]")) {
return garrow_record_batch_new_raw(&(*arrow_record_batch));
} else {
return NULL;
}
Expand Down Expand Up @@ -515,12 +502,10 @@ garrow_cuda_host_buffer_class_init(GArrowCUDAHostBufferClass *klass)
GArrowCUDAHostBuffer *
garrow_cuda_host_buffer_new(gint gpu_number, gint64 size, GError **error)
{
arrow::cuda::CudaDeviceManager *manager;
auto status = arrow::cuda::CudaDeviceManager::GetInstance(&manager);
std::shared_ptr<arrow::cuda::CudaHostBuffer> arrow_buffer;
status = manager->AllocateHost(gpu_number, size, &arrow_buffer);
if (garrow_error_check(error, status, "[cuda][host-buffer][new]")) {
return garrow_cuda_host_buffer_new_raw(&arrow_buffer);
auto arrow_manager = arrow::cuda::CudaDeviceManager::Instance();
auto arrow_buffer = (*arrow_manager)->AllocateHost(gpu_number, size);
if (garrow::check(error, arrow_buffer, "[cuda][host-buffer][new]")) {
return garrow_cuda_host_buffer_new_raw(&(*arrow_buffer));
} else {
return NULL;
}
Expand Down Expand Up @@ -631,11 +616,9 @@ garrow_cuda_ipc_memory_handle_new(const guint8 *data,
gsize size,
GError **error)
{
std::shared_ptr<arrow::cuda::CudaIpcMemHandle> arrow_handle;
auto status = arrow::cuda::CudaIpcMemHandle::FromBuffer(data, &arrow_handle);
if (garrow_error_check(error, status,
"[cuda][ipc-memory-handle][new]")) {
return garrow_cuda_ipc_memory_handle_new_raw(&arrow_handle);
auto arrow_handle = arrow::cuda::CudaIpcMemHandle::FromBuffer(data);
if (garrow::check(error, arrow_handle, "[cuda][ipc-memory-handle][new]")) {
return garrow_cuda_ipc_memory_handle_new_raw(&(*arrow_handle));
} else {
return NULL;
}
Expand All @@ -658,12 +641,10 @@ garrow_cuda_ipc_memory_handle_serialize(GArrowCUDAIPCMemoryHandle *handle,
GError **error)
{
auto arrow_handle = garrow_cuda_ipc_memory_handle_get_raw(handle);
std::shared_ptr<arrow::Buffer> arrow_buffer;
auto status = arrow_handle->Serialize(arrow::default_memory_pool(),
&arrow_buffer);
if (garrow_error_check(error, status,
"[cuda][ipc-memory-handle][serialize]")) {
return garrow_buffer_new_raw(&arrow_buffer);
auto arrow_buffer = arrow_handle->Serialize(arrow::default_memory_pool());
if (garrow::check(error, arrow_buffer,
"[cuda][ipc-memory-handle][serialize]")) {
return garrow_buffer_new_raw(&(*arrow_buffer));
} else {
return NULL;
}
Expand Down
17 changes: 7 additions & 10 deletions c_glib/plasma-glib/client.cpp
Expand Up @@ -539,22 +539,19 @@ gplasma_client_refer_object(GPlasmaClient *client,
metadata = garrow_buffer_new_raw(&plasma_metadata);
} else {
#ifdef HAVE_ARROW_CUDA
std::shared_ptr<arrow::cuda::CudaBuffer> plasma_cuda_data;
status = arrow::cuda::CudaBuffer::FromBuffer(plasma_data,
&plasma_cuda_data);
if (!garrow_error_check(error, status, context)) {
auto plasma_cuda_data = arrow::cuda::CudaBuffer::FromBuffer(plasma_data);
if (!garrow::check(error, plasma_cuda_data, context)) {
return NULL;
}
std::shared_ptr<arrow::cuda::CudaBuffer> plasma_cuda_metadata;
status = arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata,
&plasma_cuda_metadata);
if (!garrow_error_check(error, status, context)) {
auto plasma_cuda_metadata =
arrow::cuda::CudaBuffer::FromBuffer(plasma_metadata);
if (!garrow::check(error, plasma_cuda_metadata, context)) {
return NULL;
}

data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_data));
data = GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_data)));
metadata =
GARROW_BUFFER(garrow_cuda_buffer_new_raw(&plasma_cuda_metadata));
GARROW_BUFFER(garrow_cuda_buffer_new_raw(&(*plasma_cuda_metadata)));
#else
g_set_error(error,
GARROW_ERROR,
Expand Down
27 changes: 20 additions & 7 deletions cpp/src/arrow/gpu/cuda_arrow_ipc.cc
Expand Up @@ -42,21 +42,26 @@ namespace flatbuf = org::apache::arrow::flatbuf;

namespace cuda {

Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
std::shared_ptr<CudaBuffer>* out) {
Result<std::shared_ptr<CudaBuffer>> SerializeRecordBatch(const RecordBatch& batch,
CudaContext* ctx) {
ARROW_ASSIGN_OR_RAISE(auto buf,
ipc::SerializeRecordBatch(batch, ctx->memory_manager()));
return CudaBuffer::FromBuffer(buf, out);
return CudaBuffer::FromBuffer(buf);
}

Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
std::shared_ptr<CudaBuffer>* out) {
return SerializeRecordBatch(batch, ctx).Value(out);
}

Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
std::unique_ptr<ipc::Message>* out) {
return ipc::ReadMessage(reader, pool).Value(out);
}

Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
const std::shared_ptr<CudaBuffer>& buffer, MemoryPool* pool,
std::shared_ptr<RecordBatch>* out) {
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const std::shared_ptr<Schema>& schema, const std::shared_ptr<CudaBuffer>& buffer,
MemoryPool* pool) {
CudaBufferReader cuda_reader(buffer);

// The pool is only used for metadata allocation
Expand All @@ -67,7 +72,15 @@ Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,

// Zero-copy read on device memory
ipc::DictionaryMemo unused_memo;
return ipc::ReadRecordBatch(*message, schema, &unused_memo, out);
std::shared_ptr<RecordBatch> batch;
RETURN_NOT_OK(ipc::ReadRecordBatch(*message, schema, &unused_memo, &batch));
return batch;
}

Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
const std::shared_ptr<CudaBuffer>& buffer, MemoryPool* pool,
std::shared_ptr<RecordBatch>* out) {
return ReadRecordBatch(schema, buffer, pool).Value(out);
}

} // namespace cuda
Expand Down
25 changes: 22 additions & 3 deletions cpp/src/arrow/gpu/cuda_arrow_ipc.h
Expand Up @@ -22,7 +22,7 @@
#include <memory>

#include "arrow/buffer.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/visibility.h"

#include "arrow/gpu/cuda_memory.h"
Expand All @@ -41,33 +41,52 @@ class Message;

namespace cuda {

/// \brief Write record batch message to GPU device memory
/// \param[in] batch record batch to write
/// \param[in] ctx CudaContext to allocate device memory from
/// \return CudaBuffer or Status
ARROW_EXPORT
Result<std::shared_ptr<CudaBuffer>> SerializeRecordBatch(const RecordBatch& batch,
CudaContext* ctx);

/// \brief Write record batch message to GPU device memory
/// \param[in] batch record batch to write
/// \param[in] ctx CudaContext to allocate device memory from
/// \param[out] out the returned device buffer which contains the record batch message
/// \return Status
ARROW_DEPRECATED("Use Result-returning version")
ARROW_EXPORT
Status SerializeRecordBatch(const RecordBatch& batch, CudaContext* ctx,
std::shared_ptr<CudaBuffer>* out);

// TODO deprecate for ipc::ReadMessage()?

/// \brief Read Arrow IPC message located on GPU device
/// \param[in] reader a CudaBufferReader
/// \param[in] pool a MemoryPool to allocate CPU memory for the metadata
/// \param[out] message the deserialized message, body still on device
///
/// This function reads the message metadata into host memory, but leaves the
/// message body on the device
ARROW_DEPRECATED("Use arrow::ipc::ReadMessage")
ARROW_EXPORT
Status ReadMessage(CudaBufferReader* reader, MemoryPool* pool,
std::unique_ptr<ipc::Message>* message);

/// \brief ReadRecordBatch specialized to handle metadata on CUDA device
/// \param[in] schema the Schema for the record batch
/// \param[in] buffer a CudaBuffer containing the complete IPC message
/// \param[in] pool a MemoryPool to use for allocating space for the metadata
/// \return RecordBatch or Status
ARROW_EXPORT
Result<std::shared_ptr<RecordBatch>> ReadRecordBatch(
const std::shared_ptr<Schema>& schema, const std::shared_ptr<CudaBuffer>& buffer,
MemoryPool* pool = default_memory_pool());

/// \brief ReadRecordBatch specialized to handle metadata on CUDA device
/// \param[in] schema the Schema for the record batch
/// \param[in] buffer a CudaBuffer containing the complete IPC message
/// \param[in] pool a MemoryPool to use for allocating space for the metadata
/// \param[out] out the reconstructed RecordBatch, with device pointers
ARROW_DEPRECATED("Use Result-returning version")
ARROW_EXPORT
Status ReadRecordBatch(const std::shared_ptr<Schema>& schema,
const std::shared_ptr<CudaBuffer>& buffer, MemoryPool* pool,
Expand Down

0 comments on commit 5411915

Please sign in to comment.