Skip to content

Commit

Permalink
GH-36103: [C++] Initial device sync API (#37040)
Browse files Browse the repository at this point in the history
### Rationale for this change
Building on the `ArrowDeviceArray` we need to expand the abstractions for handling events and stream synchronization for devices.

### What changes are included in this PR?
Initial Abstract implementations for the new DeviceSync API and a CPU implementation. This will be followed up by a CUDA implementation in a subsequent PR.

### Are these changes tested?
Yes, tests are added for Import/Export DeviceArrays using the DeviceSync handling.

* Closes: #36103

Lead-authored-by: Matt Topol <zotthewizard@gmail.com>
Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Antoine Pitrou <pitrou@free.fr>
Signed-off-by: Matt Topol <zotthewizard@gmail.com>
  • Loading branch information
3 people committed Aug 22, 2023
1 parent 702e9ca commit d062c89
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 73 deletions.
2 changes: 2 additions & 0 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ class ARROW_EXPORT Buffer {
static Result<std::shared_ptr<Buffer>> ViewOrCopy(
std::shared_ptr<Buffer> source, const std::shared_ptr<MemoryManager>& to);

virtual std::shared_ptr<Device::SyncEvent> device_sync_event() { return NULLPTR; }

protected:
bool is_mutable_;
bool is_cpu_;
Expand Down
44 changes: 22 additions & 22 deletions cpp/src/arrow/c/bridge.cc
Original file line number Diff line number Diff line change
Expand Up @@ -521,8 +521,7 @@ struct ExportedArrayPrivateData : PoolAllocationMixin<ExportedArrayPrivateData>
SmallVector<struct ArrowArray*, 4> child_pointers_;

std::shared_ptr<ArrayData> data_;

RawSyncEvent sync_event_;
std::shared_ptr<Device::SyncEvent> sync_;

ExportedArrayPrivateData() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ExportedArrayPrivateData);
Expand All @@ -547,10 +546,6 @@ void ReleaseExportedArray(struct ArrowArray* array) {
}
DCHECK_NE(array->private_data, nullptr);
auto* pdata = reinterpret_cast<ExportedArrayPrivateData*>(array->private_data);
if (pdata->sync_event_.sync_event != nullptr &&
pdata->sync_event_.release_func != nullptr) {
pdata->sync_event_.release_func(pdata->sync_event_.sync_event);
}
delete pdata;

ArrowArrayMarkReleased(array);
Expand Down Expand Up @@ -591,7 +586,7 @@ struct ArrayExporter {
// Store owning pointer to ArrayData
export_.data_ = data;

export_.sync_event_ = RawSyncEvent();
export_.sync_ = nullptr;
return Status::OK();
}

Expand Down Expand Up @@ -714,12 +709,9 @@ Result<std::pair<std::optional<DeviceAllocationType>, int64_t>> ValidateDeviceIn
return std::make_pair(device_type, device_id);
}

Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
Status ExportDeviceArray(const Array& array, std::shared_ptr<Device::SyncEvent> sync,
struct ArrowDeviceArray* out, struct ArrowSchema* out_schema) {
if (sync_event.sync_event != nullptr && sync_event.release_func) {
return Status::Invalid(
"Must provide a release event function if providing a non-null event");
}
void* sync_event = sync ? sync->get_raw() : nullptr;

SchemaExportGuard guard(out_schema);
if (out_schema != nullptr) {
Expand All @@ -739,19 +731,20 @@ Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
exporter.Finish(&out->array);

auto* pdata = reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
pdata->sync_event_ = sync_event;
out->sync_event = sync_event.sync_event;
pdata->sync_ = std::move(sync);
out->sync_event = sync_event;

guard.Detach();
return Status::OK();
}

Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event,
Status ExportDeviceRecordBatch(const RecordBatch& batch,
std::shared_ptr<Device::SyncEvent> sync,
struct ArrowDeviceArray* out,
struct ArrowSchema* out_schema) {
if (sync_event.sync_event != nullptr && sync_event.release_func == nullptr) {
return Status::Invalid(
"Must provide a release event function if providing a non-null event");
void* sync_event{nullptr};
if (sync) {
sync_event = sync->get_raw();
}

// XXX perhaps bypass ToStructArray for speed?
Expand All @@ -776,8 +769,8 @@ Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event
exporter.Finish(&out->array);

auto* pdata = reinterpret_cast<ExportedArrayPrivateData*>(out->array.private_data);
pdata->sync_event_ = sync_event;
out->sync_event = sync_event.sync_event;
pdata->sync_ = std::move(sync);
out->sync_event = sync_event;

guard.Detach();
return Status::OK();
Expand Down Expand Up @@ -1362,7 +1355,7 @@ namespace {
// The ArrowArray is released on destruction.
struct ImportedArrayData {
struct ArrowArray array_;
void* sync_event_;
std::shared_ptr<Device::SyncEvent> device_sync_;

ImportedArrayData() {
ArrowArrayMarkReleased(&array_); // Initially released
Expand Down Expand Up @@ -1395,6 +1388,10 @@ class ImportedBuffer : public Buffer {

~ImportedBuffer() override {}

std::shared_ptr<Device::SyncEvent> device_sync_event() override {
return import_->device_sync_;
}

protected:
std::shared_ptr<ImportedArrayData> import_;
};
Expand All @@ -1409,7 +1406,10 @@ struct ArrayImporter {
ARROW_ASSIGN_OR_RAISE(memory_mgr_, mapper(src->device_type, src->device_id));
device_type_ = static_cast<DeviceAllocationType>(src->device_type);
RETURN_NOT_OK(Import(&src->array));
import_->sync_event_ = src->sync_event;
if (src->sync_event != nullptr) {
ARROW_ASSIGN_OR_RAISE(import_->device_sync_, memory_mgr_->WrapDeviceSyncEvent(
src->sync_event, [](void*) {}));
}
// reset internal state before next import
memory_mgr_.reset();
device_type_ = DeviceAllocationType::kCPU;
Expand Down
29 changes: 10 additions & 19 deletions cpp/src/arrow/c/bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <string>

#include "arrow/c/abi.h"
#include "arrow/device.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
Expand Down Expand Up @@ -172,33 +173,22 @@ Result<std::shared_ptr<RecordBatch>> ImportRecordBatch(struct ArrowArray* array,
///
/// @{

/// \brief EXPERIMENTAL: Type for freeing a sync event
///
/// If synchronization is necessary for accessing the data on a device,
/// a pointer to an event needs to be passed when exporting the device
/// array. It's the responsibility of the release function for the array
/// to release the event. Both can be null if no sync'ing is necessary.
struct RawSyncEvent {
void* sync_event = NULL;
std::function<void(void*)> release_func;
};

/// \brief EXPERIMENTAL: Export C++ Array as an ArrowDeviceArray.
///
/// The resulting ArrowDeviceArray struct keeps the array data and buffers alive
/// until its release callback is called by the consumer. All buffers in
/// the provided array MUST have the same device_type, otherwise an error
/// will be returned.
///
/// If a non-null sync_event is provided, then the sync_release func must also be
/// non-null. If the sync_event is null, then the sync_release parameter is not called.
/// If sync is non-null, get_event will be called on it in order to
/// potentially provide an event for consumers to synchronize on.
///
/// \param[in] array Array object to export
/// \param[in] sync_event A struct containing what is needed for syncing if necessary
/// \param[in] sync shared_ptr to object derived from Device::SyncEvent or null
/// \param[out] out C struct to export the array to
/// \param[out] out_schema optional C struct to export the array type to
ARROW_EXPORT
Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
Status ExportDeviceArray(const Array& array, std::shared_ptr<Device::SyncEvent> sync,
struct ArrowDeviceArray* out,
struct ArrowSchema* out_schema = NULLPTR);

Expand All @@ -212,15 +202,16 @@ Status ExportDeviceArray(const Array& array, RawSyncEvent sync_event,
/// otherwise an error will be returned. If columns are on different devices,
/// they should be exported using different ArrowDeviceArray instances.
///
/// If a non-null sync_event is provided, then the sync_release func must also be
/// non-null. If the sync_event is null, then the sync_release parameter is ignored.
/// If sync is non-null, get_event will be called on it in order to
/// potentially provide an event for consumers to synchronize on.
///
/// \param[in] batch Record batch to export
/// \param[in] sync_event A struct containing what is needed for syncing if necessary
/// \param[in] sync shared_ptr to object derived from Device::SyncEvent or null
/// \param[out] out C struct where to export the record batch
/// \param[out] out_schema optional C struct where to export the record batch schema
ARROW_EXPORT
Status ExportDeviceRecordBatch(const RecordBatch& batch, RawSyncEvent sync_event,
Status ExportDeviceRecordBatch(const RecordBatch& batch,
std::shared_ptr<Device::SyncEvent> sync,
struct ArrowDeviceArray* out,
struct ArrowSchema* out_schema = NULLPTR);

Expand Down
Loading

0 comments on commit d062c89

Please sign in to comment.