Skip to content

Commit

Permalink
ARROW-7991: [C++][Plasma] Allow option for evicting if full when crea…
Browse files Browse the repository at this point in the history
…ting an object

Allow the client to pass in a flag during object creation specifying whether objects should be evicted or not.

Closes #6520 from stephanie-wang/try-evict and squashes the following commits:

9a9dc1a <Stephanie Wang> Merge branch 'master' into try-evict
9e8c08f <Stephanie Wang> fix
2f38969 <Stephanie Wang> Merge remote-tracking branch 'upstream/master' into try-evict
a32ab9b <Stephanie Wang> Default evict_if_full arg
9ddc881 <Stephanie Wang> document arg
c2ba17c <Stephanie Wang> fix pyx
48e70bf <Stephanie Wang> Fix cpp
62b2f63 <Stephanie Wang> fix tests
2889274 <Stephanie Wang> protocol
ecef915 <Stephanie Wang> Add flag to evict if full

Authored-by: Stephanie Wang <swang@cs.berkeley.edu>
Signed-off-by: Philipp Moritz <pcmoritz@gmail.com>
  • Loading branch information
stephanie-wang authored and pcmoritz committed Mar 9, 2020
1 parent e728316 commit af45b92
Show file tree
Hide file tree
Showing 10 changed files with 197 additions and 91 deletions.
43 changes: 26 additions & 17 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,16 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status SetClientOptions(const std::string& client_name, int64_t output_memory_quota);

Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0,
bool evict_if_full = true);

Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
const std::string& metadata);
const std::string& metadata, bool evict_if_full = true);

Status CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata);
const std::vector<std::string>& metadata,
bool evict_if_full = true);

Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer>* object_buffers);
Expand Down Expand Up @@ -416,13 +418,14 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,

Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
std::shared_ptr<Buffer>* data, int device_num,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
RETURN_NOT_OK(
SendCreateRequest(store_conn_, object_id, data_size, metadata_size, device_num));
RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size,
metadata_size, device_num));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer));
ObjectID id;
Expand Down Expand Up @@ -485,7 +488,8 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,

Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
const std::string& data,
const std::string& metadata) {
const std::string& metadata,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
Expand All @@ -496,7 +500,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size());
memcpy(&digest[0], &hash, sizeof(hash));

RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data, metadata, digest));
RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data,
metadata, digest));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, &buffer));
Expand All @@ -506,7 +511,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,

Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata) {
const std::vector<std::string>& metadata,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_;
Expand All @@ -522,8 +528,8 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& objec
digests.push_back(digest);
}

RETURN_NOT_OK(
SendCreateAndSealBatchRequest(store_conn_, object_ids, data, metadata, digests));
RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full,
data, metadata, digests));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealBatchReply, &buffer));
Expand Down Expand Up @@ -1131,19 +1137,22 @@ Status PlasmaClient::SetClientOptions(const std::string& client_name,

Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num);
std::shared_ptr<Buffer>* data, int device_num,
bool evict_if_full) {
return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num,
evict_if_full);
}

Status PlasmaClient::CreateAndSeal(const ObjectID& object_id, const std::string& data,
const std::string& metadata) {
return impl_->CreateAndSeal(object_id, data, metadata);
const std::string& metadata, bool evict_if_full) {
return impl_->CreateAndSeal(object_id, data, metadata, evict_if_full);
}

Status PlasmaClient::CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata) {
return impl_->CreateAndSealBatch(object_ids, data, metadata);
const std::vector<std::string>& metadata,
bool evict_if_full) {
return impl_->CreateAndSealBatch(object_ids, data, metadata, evict_if_full);
}

Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@ class ARROW_EXPORT PlasmaClient {
/// device_num = 0 corresponds to the host,
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
/// \param evict_if_full Whether to evict other objects to make space for
/// this object.
/// \return The return status.
///
/// The returned object must be released once it is done with. It must also
/// be either sealed or aborted.
Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0,
bool evict_if_full = true);

/// Create and seal an object in the object store. This is an optimization
/// which allows small objects to be created quickly with fewer messages to
Expand All @@ -104,20 +107,25 @@ class ARROW_EXPORT PlasmaClient {
/// \param object_id The ID of the object to create.
/// \param data The data for the object to create.
/// \param metadata The metadata for the object to create.
/// \param evict_if_full Whether to evict other objects to make space for
/// this object.
/// \return The return status.
Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
const std::string& metadata);
const std::string& metadata, bool evict_if_full = true);

/// Create and seal multiple objects in the object store. This is an optimization
/// of CreateAndSeal to eliminate the cost of IPC per object.
///
/// \param object_ids The vector of IDs of the objects to create.
/// \param data The vector of data for the objects to create.
/// \param metadata The vector of metadata for the objects to create.
/// \param evict_if_full Whether to evict other objects to make space for
/// these objects.
/// \return The return status.
Status CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata);
const std::vector<std::string>& metadata,
bool evict_if_full = true);

/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/plasma/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ table PlasmaGetDebugStringReply {
table PlasmaCreateRequest {
// ID of the object to be created.
object_id: string;
// Whether to evict other objects to make room for this one.
evict_if_full: bool;
// The size of the object's data in bytes.
data_size: ulong;
// The size of the object's metadata in bytes.
Expand Down Expand Up @@ -171,6 +173,8 @@ table PlasmaCreateReply {
table PlasmaCreateAndSealRequest {
// ID of the object to be created.
object_id: string;
// Whether to evict other objects to make room for this one.
evict_if_full: bool;
// The object's data.
data: string;
// The object's metadata.
Expand All @@ -186,6 +190,8 @@ table PlasmaCreateAndSealReply {

table PlasmaCreateAndSealBatchRequest {
object_ids: [string];
// Whether to evict other objects to make room for these objects.
evict_if_full: bool;
data: [string];
metadata: [string];
digest: [string];
Expand Down
Loading

0 comments on commit af45b92

Please sign in to comment.