Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 36 additions & 31 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,13 @@ using arrow::gpu::CudaDeviceManager;

#define XXH64_DEFAULT_SEED 0

namespace fb = plasma::flatbuf;

namespace plasma {

using fb::MessageType;
using fb::PlasmaError;

using arrow::MutableBuffer;

typedef struct XXH64_state_s XXH64_state_t;
Expand Down Expand Up @@ -225,17 +230,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
ObjectBuffer* object_buffers);

uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
uint8_t* LookupOrMmap(int fd, int store_fd_val, int64_t map_size);

uint8_t* lookup_mmapped_file(int store_fd_val);
uint8_t* LookupMmappedFile(int store_fd_val);

void increment_object_count(const ObjectID& object_id, PlasmaObject* object,
bool is_sealed);
void IncrementObjectCount(const ObjectID& object_id, PlasmaObject* object,
bool is_sealed);

bool compute_object_hash_parallel(XXH64_state_t* hash_state, const unsigned char* data,
int64_t nbytes);
bool ComputeObjectHashParallel(XXH64_state_t* hash_state, const unsigned char* data,
int64_t nbytes);

uint64_t compute_object_hash(const ObjectBuffer& obj_buffer);
uint64_t ComputeObjectHash(const ObjectBuffer& obj_buffer);

/// File descriptor of the Unix domain socket that connects to the store.
int store_conn_;
Expand Down Expand Up @@ -284,7 +289,7 @@ PlasmaClient::Impl::~Impl() {}
// If the file descriptor fd has been mmapped in this client process before,
// return the pointer that was returned by mmap, otherwise mmap it and store the
// pointer in a hash table.
uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) {
uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
Expand All @@ -310,7 +315,7 @@ uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t ma

// Get a pointer to a file that we know has been memory mapped in this client
// process before.
uint8_t* PlasmaClient::Impl::lookup_mmapped_file(int store_fd_val) {
uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
auto entry = mmap_table_.find(store_fd_val);
ARROW_CHECK(entry != mmap_table_.end());
return entry->second.pointer;
Expand All @@ -321,8 +326,8 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
return (elem != objects_in_use_.end());
}

void PlasmaClient::Impl::increment_object_count(const ObjectID& object_id,
PlasmaObject* object, bool is_sealed) {
void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
PlasmaObject* object, bool is_sealed) {
// Increment the count of the object to track the fact that it is being used.
// The corresponding decrement should happen in PlasmaClient::Release.
auto elem = objects_in_use_.find(object_id);
Expand Down Expand Up @@ -383,7 +388,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
*data = std::make_shared<MutableBuffer>(
lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
// If plasma_create is being called from a transfer, then we will not copy the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
Expand Down Expand Up @@ -414,13 +419,13 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// client is using. A call to PlasmaClient::Release is required to decrement
// this
// count. Cache the reference to the object.
increment_object_count(object_id, &object, false);
IncrementObjectCount(object_id, &object, false);
// We increment the count a second time (and the corresponding decrement will
// happen in a PlasmaClient::Release call in plasma_seal) so even if the
// buffer
// returned by PlasmaClient::Dreate goes out of scope, the object does not get
// released before the call to PlasmaClient::Seal happens.
increment_object_count(object_id, &object, false);
IncrementObjectCount(object_id, &object, false);
return Status::OK();
}

Expand All @@ -446,7 +451,7 @@ Status PlasmaClient::Impl::GetBuffers(
std::shared_ptr<Buffer> physical_buf;

if (object->device_num == 0) {
uint8_t* data = lookup_mmapped_file(object->store_fd);
uint8_t* data = LookupMmappedFile(object->store_fd);
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
Expand All @@ -463,7 +468,7 @@ Status PlasmaClient::Impl::GetBuffers(
object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that this
// client is using. Cache the reference to the object.
increment_object_count(object_ids[i], object, true);
IncrementObjectCount(object_ids[i], object, true);
}
}

Expand All @@ -490,7 +495,7 @@ Status PlasmaClient::Impl::GetBuffers(
for (size_t i = 0; i < store_fds.size(); i++) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0);
lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
LookupOrMmap(fd, store_fds[i], mmap_sizes[i]);
}

for (int64_t i = 0; i < num_objects; ++i) {
Expand All @@ -509,7 +514,7 @@ Status PlasmaClient::Impl::GetBuffers(
if (object->data_size != -1) {
std::shared_ptr<Buffer> physical_buf;
if (object->device_num == 0) {
uint8_t* data = lookup_mmapped_file(object->store_fd);
uint8_t* data = LookupMmappedFile(object->store_fd);
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
Expand Down Expand Up @@ -539,7 +544,7 @@ Status PlasmaClient::Impl::GetBuffers(
object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that this
// client is using. Cache the reference to the object.
increment_object_count(received_object_ids[i], object, true);
IncrementObjectCount(received_object_ids[i], object, true);
} else {
// The object was not retrieved. The caller can detect this condition
// by checking the boolean value of the metadata/data buffers.
Expand Down Expand Up @@ -693,9 +698,9 @@ static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t
*hash = XXH64_digest(&hash_state);
}

bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
const unsigned char* data,
int64_t nbytes) {
bool PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state,
const unsigned char* data,
int64_t nbytes) {
// Note that this function will likely be faster if the address of data is
// aligned on a 64-byte boundary.
auto pool = arrow::internal::GetCpuThreadPool();
Expand Down Expand Up @@ -729,7 +734,7 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
return true;
}

uint64_t PlasmaClient::Impl::compute_object_hash(const ObjectBuffer& obj_buffer) {
uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer) {
DCHECK(obj_buffer.metadata);
DCHECK(obj_buffer.data);
XXH64_state_t hash_state;
Expand All @@ -739,7 +744,7 @@ uint64_t PlasmaClient::Impl::compute_object_hash(const ObjectBuffer& obj_buffer)
}
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
if (obj_buffer.data->size() >= kBytesInMB) {
compute_object_hash_parallel(
ComputeObjectHashParallel(
&hash_state, reinterpret_cast<const unsigned char*>(obj_buffer.data->data()),
obj_buffer.data->size());
} else {
Expand Down Expand Up @@ -850,7 +855,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
return Status::PlasmaObjectNonexistent("Object not found");
}
// Compute the hash.
uint64_t hash = compute_object_hash(object_buffers[0]);
uint64_t hash = ComputeObjectHash(object_buffers[0]);
memcpy(digest, &hash, sizeof(hash));
return Status::OK();
}
Expand All @@ -877,11 +882,11 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {

Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size) {
auto notification = read_message_async(fd);
auto notification = ReadMessageAsync(fd);
if (notification == NULL) {
return Status::IOError("Failed to read object notification from Plasma socket");
}
auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification.get());
auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(notification.get());
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
if (object_info->is_deletion()) {
Expand Down Expand Up @@ -977,18 +982,18 @@ Status PlasmaClient::Impl::Wait(int64_t num_object_requests,
*num_objects_ready = 0;
for (int i = 0; i < num_object_requests; ++i) {
ObjectRequestType type = object_requests[i].type;
ObjectStatus status = object_requests[i].status;
fb::ObjectStatus status = object_requests[i].status;
switch (type) {
case ObjectRequestType::PLASMA_QUERY_LOCAL:
if (status == ObjectStatus::Local) {
if (status == fb::ObjectStatus::Local) {
*num_objects_ready += 1;
}
break;
case ObjectRequestType::PLASMA_QUERY_ANYWHERE:
if (status == ObjectStatus::Local || status == ObjectStatus::Remote) {
if (status == fb::ObjectStatus::Local || status == fb::ObjectStatus::Remote) {
*num_objects_ready += 1;
} else {
ARROW_CHECK(status == ObjectStatus::Nonexistent);
ARROW_CHECK(status == fb::ObjectStatus::Nonexistent);
}
break;
default:
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/plasma/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

#include "plasma/plasma_generated.h"

namespace fb = plasma::flatbuf;

namespace plasma {

using arrow::Status;
Expand Down Expand Up @@ -123,24 +125,24 @@ bool UniqueID::operator==(const UniqueID& rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}

Status plasma_error_status(PlasmaError plasma_error) {
Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
switch (plasma_error) {
case PlasmaError::OK:
case fb::PlasmaError::OK:
return Status::OK();
case PlasmaError::ObjectExists:
case fb::PlasmaError::ObjectExists:
return Status::PlasmaObjectExists("object already exists in the plasma store");
case PlasmaError::ObjectNonexistent:
case fb::PlasmaError::ObjectNonexistent:
return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
case PlasmaError::OutOfMemory:
case fb::PlasmaError::OutOfMemory:
return Status::PlasmaStoreFull("object does not fit in the plasma store");
default:
ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
}
return Status::OK();
}

ARROW_EXPORT ObjectStatus ObjectStatusLocal = ObjectStatus::Local;
ARROW_EXPORT ObjectStatus ObjectStatusRemote = ObjectStatus::Remote;
ARROW_EXPORT fb::ObjectStatus ObjectStatusLocal = fb::ObjectStatus::Local;
ARROW_EXPORT fb::ObjectStatus ObjectStatusRemote = fb::ObjectStatus::Remote;

const PlasmaStoreInfo* plasma_config;

Expand Down
14 changes: 9 additions & 5 deletions cpp/src/plasma/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,15 @@
#include "arrow/status.h"
#include "arrow/util/logging.h"

namespace plasma {

namespace flatbuf {

// Forward declaration outside the namespace, which is defined in plasma_generated.h.
enum class PlasmaError : int32_t;
enum class ObjectStatus : int32_t;

namespace plasma {
} // namespace flatbuf

constexpr int64_t kUniqueIDSize = 20;

Expand All @@ -58,7 +62,7 @@ static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");

typedef UniqueID ObjectID;

arrow::Status plasma_error_status(PlasmaError plasma_error);
arrow::Status PlasmaErrorStatus(flatbuf::PlasmaError plasma_error);

/// Size of object hash digests.
constexpr int64_t kDigestSize = sizeof(uint64_t);
Expand Down Expand Up @@ -87,11 +91,11 @@ struct ObjectRequest {
/// - ObjectStatus::Nonexistent: object does not exist in the system.
/// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
/// for being transferred or it is transferring.
ObjectStatus status;
flatbuf::ObjectStatus status;
};

extern ObjectStatus ObjectStatusLocal;
extern ObjectStatus ObjectStatusRemote;
extern flatbuf::ObjectStatus ObjectStatusLocal;
extern flatbuf::ObjectStatus ObjectStatusRemote;

/// Globally accessible reference to plasma store configuration.
/// TODO(pcm): This can be avoided with some refactoring of existing code
Expand Down
Loading