Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-7991: [C++][Plasma] Allow option for evicting if full when creating an object #6520

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,16 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
Status SetClientOptions(const std::string& client_name, int64_t output_memory_quota);

Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0,
bool evict_if_full = true);

Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
const std::string& metadata);
const std::string& metadata, bool evict_if_full = true);

Status CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata);
const std::vector<std::string>& metadata,
bool evict_if_full = true);

Status Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
std::vector<ObjectBuffer>* object_buffers);
Expand Down Expand Up @@ -416,13 +418,14 @@ void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,

Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
std::shared_ptr<Buffer>* data, int device_num,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
<< data_size << " and metadata size " << metadata_size;
RETURN_NOT_OK(
SendCreateRequest(store_conn_, object_id, data_size, metadata_size, device_num));
RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, evict_if_full, data_size,
metadata_size, device_num));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaCreateReply, &buffer));
ObjectID id;
Expand Down Expand Up @@ -485,7 +488,8 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,

Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
const std::string& data,
const std::string& metadata) {
const std::string& metadata,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called CreateAndSeal on conn " << store_conn_;
Expand All @@ -496,7 +500,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,
reinterpret_cast<const uint8_t*>(metadata.data()), metadata.size());
memcpy(&digest[0], &hash, sizeof(hash));

RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, data, metadata, digest));
RETURN_NOT_OK(SendCreateAndSealRequest(store_conn_, object_id, evict_if_full, data,
metadata, digest));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealReply, &buffer));
Expand All @@ -506,7 +511,8 @@ Status PlasmaClient::Impl::CreateAndSeal(const ObjectID& object_id,

Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata) {
const std::vector<std::string>& metadata,
bool evict_if_full) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

ARROW_LOG(DEBUG) << "called CreateAndSealBatch on conn " << store_conn_;
Expand All @@ -522,8 +528,8 @@ Status PlasmaClient::Impl::CreateAndSealBatch(const std::vector<ObjectID>& objec
digests.push_back(digest);
}

RETURN_NOT_OK(
SendCreateAndSealBatchRequest(store_conn_, object_ids, data, metadata, digests));
RETURN_NOT_OK(SendCreateAndSealBatchRequest(store_conn_, object_ids, evict_if_full,
data, metadata, digests));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(
PlasmaReceive(store_conn_, MessageType::PlasmaCreateAndSealBatchReply, &buffer));
Expand Down Expand Up @@ -1131,19 +1137,22 @@ Status PlasmaClient::SetClientOptions(const std::string& client_name,

Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num);
std::shared_ptr<Buffer>* data, int device_num,
bool evict_if_full) {
return impl_->Create(object_id, data_size, metadata, metadata_size, data, device_num,
evict_if_full);
}

Status PlasmaClient::CreateAndSeal(const ObjectID& object_id, const std::string& data,
const std::string& metadata) {
return impl_->CreateAndSeal(object_id, data, metadata);
const std::string& metadata, bool evict_if_full) {
return impl_->CreateAndSeal(object_id, data, metadata, evict_if_full);
}

Status PlasmaClient::CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata) {
return impl_->CreateAndSealBatch(object_ids, data, metadata);
const std::vector<std::string>& metadata,
bool evict_if_full) {
return impl_->CreateAndSealBatch(object_ids, data, metadata, evict_if_full);
}

Status PlasmaClient::Get(const std::vector<ObjectID>& object_ids, int64_t timeout_ms,
Expand Down
14 changes: 11 additions & 3 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@ class ARROW_EXPORT PlasmaClient {
/// device_num = 0 corresponds to the host,
/// device_num = 1 corresponds to GPU0,
/// device_num = 2 corresponds to GPU1, etc.
/// \param evict_if_full Whether to evict other objects to make space for
/// this object.
/// \return The return status.
///
/// The returned object must be released once it is done with. It must also
/// be either sealed or aborted.
Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0,
bool evict_if_full = true);

/// Create and seal an object in the object store. This is an optimization
/// which allows small objects to be created quickly with fewer messages to
Expand All @@ -104,20 +107,25 @@ class ARROW_EXPORT PlasmaClient {
/// \param object_id The ID of the object to create.
/// \param data The data for the object to create.
/// \param metadata The metadata for the object to create.
/// \param evict_if_full Whether to evict other objects to make space for
/// this object.
/// \return The return status.
Status CreateAndSeal(const ObjectID& object_id, const std::string& data,
const std::string& metadata);
const std::string& metadata, bool evict_if_full = true);

/// Create and seal multiple objects in the object store. This is an optimization
/// of CreateAndSeal to eliminate the cost of IPC per object.
///
/// \param object_ids The vector of IDs of the objects to create.
/// \param data The vector of data for the objects to create.
/// \param metadata The vector of metadata for the objects to create.
/// \param evict_if_full Whether to evict other objects to make space for
/// these objects.
/// \return The return status.
Status CreateAndSealBatch(const std::vector<ObjectID>& object_ids,
const std::vector<std::string>& data,
const std::vector<std::string>& metadata);
const std::vector<std::string>& metadata,
bool evict_if_full = true);

/// Get some objects from the Plasma Store. This function will block until the
/// objects have all been created and sealed in the Plasma Store or the
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/plasma/plasma.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,8 @@ table PlasmaGetDebugStringReply {
table PlasmaCreateRequest {
// ID of the object to be created.
object_id: string;
// Whether to evict other objects to make room for this one.
evict_if_full: bool;
// The size of the object's data in bytes.
data_size: ulong;
// The size of the object's metadata in bytes.
Expand Down Expand Up @@ -171,6 +173,8 @@ table PlasmaCreateReply {
table PlasmaCreateAndSealRequest {
// ID of the object to be created.
object_id: string;
// Whether to evict other objects to make room for this one.
evict_if_full: bool;
// The object's data.
data: string;
// The object's metadata.
Expand All @@ -186,6 +190,8 @@ table PlasmaCreateAndSealReply {

table PlasmaCreateAndSealBatchRequest {
object_ids: [string];
// Whether to evict other objects to make room for these objects.
evict_if_full: bool;
data: [string];
metadata: [string];
digest: [string];
Expand Down
Loading