Skip to content

Commit

Permalink
ARROW-2447: [C++] Device and MemoryManager API
Browse files Browse the repository at this point in the history
Add an abstraction layer to allow safe handling of buffers residing on different devices (the CPU, a GPU...). The layer exposes two interfaces:
* the `Device` interface exposes information a particular memory-holding device
* the `MemoryManager` allows allocating, copying, reading or writing memory located on a particular device

The `Buffer` API is modified so that calling `data()` fails on non-CPU buffers. A separate `address()` method returns the buffer address as an integer, and is allowed on any buffer.

The API provides convenience functions to view or copy a buffer from one device to the other. For example, a on-GPU buffer can be copied to the CPU, and in some situations a zero-copy CPU view can also be created (depending on the GPU capabilities and how the GPU memory was allocated).

An example use in the PR is IPC. On the write side, a new `SerializeRecordBatch` overload takes a `MemoryManager` argument and is able to serialize data either to any kind of memory (CPU, GPU). On the read side, `ReadRecordBatch` now works on any kind of input buffer, and returns record batches backed by either CPU or GPU memory.

It introduces a slight complexity in the CUDA namespace, since there are both `CudaContext` and `CudaMemoryManager` classes. We could solve this by merging the two concepts (but doing so may break compatibility for existing users of CUDA).

Closes #6295 from pitrou/ARROW-2447-device-api-memory-manager and squashes the following commits:

c665f61 <Antoine Pitrou> ARROW-2447:  Device and MemoryManager API

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Wes McKinney <wesm+git@apache.org>
  • Loading branch information
pitrou authored and wesm committed Feb 12, 2020
1 parent 431fdbf commit 9f0c70c
Show file tree
Hide file tree
Showing 35 changed files with 2,245 additions and 424 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ set(ARROW_SRCS
array/validate.cc
buffer.cc
compare.cc
device.cc
extension_type.cc
memory_pool.cc
pretty_print.cc
Expand Down
112 changes: 83 additions & 29 deletions cpp/src/arrow/buffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,44 @@ Status Buffer::FromString(const std::string& data, std::shared_ptr<Buffer>* out)
return FromString(data, default_memory_pool(), out);
}

std::string Buffer::ToString() const {
return std::string(reinterpret_cast<const char*>(data_), static_cast<size_t>(size_));
}

void Buffer::CheckMutable() const { DCHECK(is_mutable()) << "buffer not mutable"; }

void Buffer::CheckCPU() const {
DCHECK(is_cpu()) << "not a CPU buffer (device: " << device()->ToString() << ")";
}

Result<std::shared_ptr<io::RandomAccessFile>> Buffer::GetReader(
std::shared_ptr<Buffer> buf) {
return buf->memory_manager_->GetBufferReader(buf);
}

Result<std::shared_ptr<io::OutputStream>> Buffer::GetWriter(std::shared_ptr<Buffer> buf) {
return buf->memory_manager_->GetBufferWriter(buf);
}

Result<std::shared_ptr<Buffer>> Buffer::Copy(std::shared_ptr<Buffer> source,
const std::shared_ptr<MemoryManager>& to) {
return MemoryManager::CopyBuffer(source, to);
}

Result<std::shared_ptr<Buffer>> Buffer::View(std::shared_ptr<Buffer> source,
const std::shared_ptr<MemoryManager>& to) {
return MemoryManager::ViewBuffer(source, to);
}

Result<std::shared_ptr<Buffer>> Buffer::ViewOrCopy(
std::shared_ptr<Buffer> source, const std::shared_ptr<MemoryManager>& to) {
auto maybe_buffer = MemoryManager::ViewBuffer(source, to);
if (maybe_buffer.ok()) {
return maybe_buffer;
}
return MemoryManager::CopyBuffer(source, to);
}

class StlStringBuffer : public Buffer {
public:
explicit StlStringBuffer(std::string&& data)
Expand All @@ -94,21 +132,27 @@ std::shared_ptr<Buffer> Buffer::FromString(std::string&& data) {
return std::make_shared<StlStringBuffer>(std::move(data));
}

std::string Buffer::ToString() const {
return std::string(reinterpret_cast<const char*>(data_), static_cast<size_t>(size_));
std::shared_ptr<Buffer> SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer,
const int64_t offset, const int64_t length) {
return std::make_shared<MutableBuffer>(buffer, offset, length);
}

void Buffer::CheckMutable() const { DCHECK(is_mutable()) << "buffer not mutable"; }
MutableBuffer::MutableBuffer(const std::shared_ptr<Buffer>& parent, const int64_t offset,
const int64_t size)
: MutableBuffer(reinterpret_cast<uint8_t*>(parent->mutable_address()) + offset,
size) {
DCHECK(parent->is_mutable()) << "Must pass mutable buffer";
parent_ = parent;
}

// -----------------------------------------------------------------------
// Pool buffer and allocation

/// A Buffer whose lifetime is tied to a particular MemoryPool
class PoolBuffer : public ResizableBuffer {
public:
explicit PoolBuffer(MemoryPool* pool) : ResizableBuffer(nullptr, 0) {
if (pool == nullptr) {
pool = default_memory_pool();
}
pool_ = pool;
}
explicit PoolBuffer(std::shared_ptr<MemoryManager> mm, MemoryPool* pool)
: ResizableBuffer(nullptr, 0, std::move(mm)), pool_(pool) {}

~PoolBuffer() override {
if (mutable_data_ != nullptr) {
Expand Down Expand Up @@ -157,22 +201,32 @@ class PoolBuffer : public ResizableBuffer {
return Status::OK();
}

static std::shared_ptr<PoolBuffer> MakeShared(MemoryPool* pool) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::make_shared<PoolBuffer>(std::move(mm), pool);
}

static std::unique_ptr<PoolBuffer> MakeUnique(MemoryPool* pool) {
std::shared_ptr<MemoryManager> mm;
if (pool == nullptr) {
pool = default_memory_pool();
mm = default_cpu_memory_manager();
} else {
mm = CPUDevice::memory_manager(pool);
}
return std::unique_ptr<PoolBuffer>(new PoolBuffer(std::move(mm), pool));
}

private:
MemoryPool* pool_;
};

std::shared_ptr<Buffer> SliceMutableBuffer(const std::shared_ptr<Buffer>& buffer,
const int64_t offset, const int64_t length) {
return std::make_shared<MutableBuffer>(buffer, offset, length);
}

MutableBuffer::MutableBuffer(const std::shared_ptr<Buffer>& parent, const int64_t offset,
const int64_t size)
: MutableBuffer(parent->mutable_data() + offset, size) {
DCHECK(parent->is_mutable()) << "Must pass mutable buffer";
parent_ = parent;
}

namespace {
// A utility that does most of the work of the `AllocateBuffer` and
// `AllocateResizableBuffer` methods. The argument `buffer` should be a smart pointer to a
Expand All @@ -189,40 +243,40 @@ inline Status ResizePoolBuffer(PoolBufferPtr&& buffer, const int64_t size,

Status AllocateBuffer(MemoryPool* pool, const int64_t size,
std::shared_ptr<Buffer>* out) {
return ResizePoolBuffer(std::make_shared<PoolBuffer>(pool), size, out);
return ResizePoolBuffer(PoolBuffer::MakeShared(pool), size, out);
}

Status AllocateBuffer(MemoryPool* pool, const int64_t size,
std::unique_ptr<Buffer>* out) {
return ResizePoolBuffer(std::unique_ptr<PoolBuffer>(new PoolBuffer(pool)), size, out);
return ResizePoolBuffer(PoolBuffer::MakeUnique(pool), size, out);
}

Status AllocateBuffer(const int64_t size, std::shared_ptr<Buffer>* out) {
return AllocateBuffer(default_memory_pool(), size, out);
return AllocateBuffer(nullptr, size, out);
}

Status AllocateBuffer(const int64_t size, std::unique_ptr<Buffer>* out) {
return AllocateBuffer(default_memory_pool(), size, out);
return AllocateBuffer(nullptr, size, out);
}

Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size,
std::shared_ptr<ResizableBuffer>* out) {
return ResizePoolBuffer(std::make_shared<PoolBuffer>(pool), size, out);
return ResizePoolBuffer(PoolBuffer::MakeShared(pool), size, out);
}

Status AllocateResizableBuffer(MemoryPool* pool, const int64_t size,
std::unique_ptr<ResizableBuffer>* out) {
return ResizePoolBuffer(std::unique_ptr<PoolBuffer>(new PoolBuffer(pool)), size, out);
return ResizePoolBuffer(PoolBuffer::MakeUnique(pool), size, out);
}

Status AllocateResizableBuffer(const int64_t size,
std::shared_ptr<ResizableBuffer>* out) {
return AllocateResizableBuffer(default_memory_pool(), size, out);
return AllocateResizableBuffer(nullptr, size, out);
}

Status AllocateResizableBuffer(const int64_t size,
std::unique_ptr<ResizableBuffer>* out) {
return AllocateResizableBuffer(default_memory_pool(), size, out);
return AllocateResizableBuffer(nullptr, size, out);
}

Status AllocateBitmap(MemoryPool* pool, int64_t length, std::shared_ptr<Buffer>* out) {
Expand Down
112 changes: 102 additions & 10 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
// specific language governing permissions and limitations
// under the License.

#ifndef ARROW_BUFFER_H
#define ARROW_BUFFER_H
#pragma once

#include <cstdint>
#include <cstring>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

#include "arrow/device.h"
#include "arrow/io/type_fwd.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"
Expand Down Expand Up @@ -57,10 +59,29 @@ class ARROW_EXPORT Buffer {
/// \note The passed memory must be kept alive through some other means
Buffer(const uint8_t* data, int64_t size)
: is_mutable_(false),
is_cpu_(true),
data_(data),
mutable_data_(NULLPTR),
size_(size),
capacity_(size) {}
capacity_(size) {
SetMemoryManager(default_cpu_memory_manager());
}

Buffer(const uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm,
std::shared_ptr<Buffer> parent = NULLPTR)
: is_mutable_(false),
data_(data),
mutable_data_(NULLPTR),
size_(size),
capacity_(size),
parent_(parent) {
SetMemoryManager(std::move(mm));
}

Buffer(uintptr_t address, int64_t size, std::shared_ptr<MemoryManager> mm,
std::shared_ptr<Buffer> parent = NULLPTR)
: Buffer(reinterpret_cast<const uint8_t*>(address), size, std::move(mm),
std::move(parent)) {}

/// \brief Construct from string_view without copying memory
///
Expand All @@ -82,8 +103,9 @@ class ARROW_EXPORT Buffer {
/// in general we expected buffers to be aligned and padded to 64 bytes. In the future
/// we might add utility methods to help determine if a buffer satisfies this contract.
Buffer(const std::shared_ptr<Buffer>& parent, const int64_t offset, const int64_t size)
: Buffer(parent->data() + offset, size) {
: Buffer(parent->data_ + offset, size) {
parent_ = parent;
SetMemoryManager(parent->memory_manager_);
}

uint8_t operator[](std::size_t i) const { return data_[i]; }
Expand Down Expand Up @@ -178,16 +200,46 @@ class ARROW_EXPORT Buffer {
explicit operator util::bytes_view() const { return util::bytes_view(data_, size_); }

/// \brief Return a pointer to the buffer's data
const uint8_t* data() const { return data_; }
///
/// The buffer has to be a CPU buffer (`is_cpu()` is true).
/// Otherwise, an assertion may be thrown or a null pointer may be returned.
///
/// To get the buffer's data address regardless of its device, call `address()`.
const uint8_t* data() const {
#ifndef NDEBUG
CheckCPU();
#endif
return ARROW_PREDICT_TRUE(is_cpu_) ? data_ : NULLPTR;
}

/// \brief Return a writable pointer to the buffer's data
///
/// The buffer has to be mutable. Otherwise, an assertion may be thrown
/// or a null pointer may be returned.
/// The buffer has to be a mutable CPU buffer (`is_cpu()` and `is_mutable()`
/// are true). Otherwise, an assertion may be thrown or a null pointer may
/// be returned.
///
/// To get the buffer's mutable data address regardless of its device, call
/// `mutable_address()`.
uint8_t* mutable_data() {
#ifndef NDEBUG
CheckCPU();
CheckMutable();
#endif
return ARROW_PREDICT_TRUE(is_cpu_) ? mutable_data_ : NULLPTR;
}

/// \brief Return the device address of the buffer's data
uintptr_t address() const { return reinterpret_cast<uintptr_t>(data_); }

/// \brief Return a writable device address to the buffer's data
///
/// The buffer has to be a mutable buffer (`is_mutable()` is true).
/// Otherwise, an assertion may be thrown or 0 may be returned.
uintptr_t mutable_address() const {
#ifndef NDEBUG
CheckMutable();
#endif
return mutable_data_;
return reinterpret_cast<uintptr_t>(mutable_data_);
}

/// \brief Return the buffer's size in bytes
Expand All @@ -196,10 +248,32 @@ class ARROW_EXPORT Buffer {
/// \brief Return the buffer's capacity (number of allocated bytes)
int64_t capacity() const { return capacity_; }

/// \brief Whether the buffer is directly CPU-accessible
///
/// If this function returns true, you can read directly from the buffer's
/// `data()` pointer. Otherwise, you'll have to `View()` or `Copy()` it.
bool is_cpu() const { return is_cpu_; }

const std::shared_ptr<Device>& device() const { return memory_manager_->device(); }

const std::shared_ptr<MemoryManager>& memory_manager() const { return memory_manager_; }

std::shared_ptr<Buffer> parent() const { return parent_; }

// Convenience functions
static Result<std::shared_ptr<io::RandomAccessFile>> GetReader(std::shared_ptr<Buffer>);
static Result<std::shared_ptr<io::OutputStream>> GetWriter(std::shared_ptr<Buffer>);

static Result<std::shared_ptr<Buffer>> Copy(std::shared_ptr<Buffer> source,
const std::shared_ptr<MemoryManager>& to);
static Result<std::shared_ptr<Buffer>> View(std::shared_ptr<Buffer> source,
const std::shared_ptr<MemoryManager>& to);
static Result<std::shared_ptr<Buffer>> ViewOrCopy(
std::shared_ptr<Buffer> source, const std::shared_ptr<MemoryManager>& to);

protected:
bool is_mutable_;
bool is_cpu_;
const uint8_t* data_;
uint8_t* mutable_data_;
int64_t size_;
Expand All @@ -208,9 +282,21 @@ class ARROW_EXPORT Buffer {
// null by default, but may be set
std::shared_ptr<Buffer> parent_;

private:
// private so that subclasses are forced to call SetMemoryManager()
std::shared_ptr<MemoryManager> memory_manager_;

protected:
void CheckMutable() const;
void CheckCPU() const;

void SetMemoryManager(std::shared_ptr<MemoryManager> mm) {
memory_manager_ = std::move(mm);
is_cpu_ = memory_manager_->is_cpu();
}

private:
Buffer() = delete;
ARROW_DISALLOW_COPY_AND_ASSIGN(Buffer);
};

Expand Down Expand Up @@ -267,6 +353,12 @@ class ARROW_EXPORT MutableBuffer : public Buffer {
is_mutable_ = true;
}

MutableBuffer(uint8_t* data, const int64_t size, std::shared_ptr<MemoryManager> mm)
: Buffer(data, size, std::move(mm)) {
mutable_data_ = data;
is_mutable_ = true;
}

MutableBuffer(const std::shared_ptr<Buffer>& parent, const int64_t offset,
const int64_t size);

Expand Down Expand Up @@ -315,6 +407,8 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {

protected:
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
ResizableBuffer(uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm)
: MutableBuffer(data, size, std::move(mm)) {}
};

/// \defgroup buffer-allocation-functions Functions for allocating buffers
Expand Down Expand Up @@ -444,5 +538,3 @@ Status ConcatenateBuffers(const BufferVector& buffers, MemoryPool* pool,
/// @}

} // namespace arrow

#endif // ARROW_BUFFER_H
Loading

0 comments on commit 9f0c70c

Please sign in to comment.