Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/plasma/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ set(PLASMA_STORE_SRCS
dlmalloc.cc
events.cc
eviction_policy.cc
quota_aware_policy.cc
plasma_allocator.cc
store.cc
thirdparty/ae/ae.c)
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/plasma/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
const std::string& manager_socket_name, int release_delay = 0,
int num_retries = -1);

Status SetClientOptions(const std::string& client_name, int64_t output_memory_quota);

Status Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata,
int64_t metadata_size, std::shared_ptr<Buffer>* data, int device_num = 0);

Expand Down Expand Up @@ -245,6 +247,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp

Status Disconnect();

std::string DebugString();

bool IsInUse(const ObjectID& object_id);

int64_t store_capacity() { return store_capacity_; }
Expand Down Expand Up @@ -977,6 +981,15 @@ Status PlasmaClient::Impl::Connect(const std::string& store_socket_name,
return Status::OK();
}

Status PlasmaClient::Impl::SetClientOptions(const std::string& client_name,
int64_t output_memory_quota) {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
RETURN_NOT_OK(SendSetOptionsRequest(store_conn_, client_name, output_memory_quota));
std::vector<uint8_t> buffer;
RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSetOptionsReply, &buffer));
return ReadSetOptionsReply(buffer.data(), buffer.size());
}

Status PlasmaClient::Impl::Disconnect() {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);

Expand All @@ -991,6 +1004,22 @@ Status PlasmaClient::Impl::Disconnect() {
return Status::OK();
}

std::string PlasmaClient::Impl::DebugString() {
std::lock_guard<std::recursive_mutex> guard(client_mutex_);
if (!SendGetDebugStringRequest(store_conn_).ok()) {
return "error sending request";
}
std::vector<uint8_t> buffer;
if (!PlasmaReceive(store_conn_, MessageType::PlasmaGetDebugStringReply, &buffer).ok()) {
return "error receiving reply";
}
std::string debug_string;
if (!ReadGetDebugStringReply(buffer.data(), buffer.size(), &debug_string).ok()) {
return "error parsing reply";
}
return debug_string;
}

// ----------------------------------------------------------------------
// PlasmaClient

Expand All @@ -1005,6 +1034,11 @@ Status PlasmaClient::Connect(const std::string& store_socket_name,
num_retries);
}

Status PlasmaClient::SetClientOptions(const std::string& client_name,
int64_t output_memory_quota) {
return impl_->SetClientOptions(client_name, output_memory_quota);
}

Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
const uint8_t* metadata, int64_t metadata_size,
std::shared_ptr<Buffer>* data, int device_num) {
Expand Down Expand Up @@ -1070,6 +1104,8 @@ Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_

Status PlasmaClient::Disconnect() { return impl_->Disconnect(); }

std::string PlasmaClient::DebugString() { return impl_->DebugString(); }

bool PlasmaClient::IsInUse(const ObjectID& object_id) {
return impl_->IsInUse(object_id);
}
Expand Down
12 changes: 12 additions & 0 deletions cpp/src/plasma/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class ARROW_EXPORT PlasmaClient {
const std::string& manager_socket_name = "", int release_delay = 0,
int num_retries = -1);

/// Set runtime options for this client.
///
/// \param client_name The name of the client, used in debug messages.
/// \param output_memory_quota The memory quota in bytes for objects created by
/// this client.
Status SetClientOptions(const std::string& client_name, int64_t output_memory_quota);

/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
///
Expand Down Expand Up @@ -250,6 +257,11 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Disconnect();

/// Get the current debug string from the plasma store server.
///
/// \return The debug string.
std::string DebugString();

/// Get the memory capacity of the store.
///
/// \return Memory capacity of the store in bytes.
Expand Down
76 changes: 66 additions & 10 deletions cpp/src/plasma/eviction_policy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "plasma/plasma_allocator.h"

#include <algorithm>
#include <sstream>

namespace plasma {

Expand All @@ -28,13 +29,47 @@ void LRUCache::Add(const ObjectID& key, int64_t size) {
// Note that it is important to use a list so the iterators stay valid.
item_list_.emplace_front(key, size);
item_map_.emplace(key, item_list_.begin());
used_capacity_ += size;
}

void LRUCache::Remove(const ObjectID& key) {
auto it = item_map_.find(key);
ARROW_CHECK(it != item_map_.end());
used_capacity_ -= it->second->second;
item_list_.erase(it->second);
item_map_.erase(it);
ARROW_CHECK(used_capacity_ >= 0) << DebugString();
}

void LRUCache::AdjustCapacity(int64_t delta) {
ARROW_LOG(INFO) << "adjusting global lru capacity from " << Capacity() << " to "
<< (Capacity() + delta) << " (max " << OriginalCapacity() << ")";
capacity_ += delta;
ARROW_CHECK(used_capacity_ >= 0) << DebugString();
}

int64_t LRUCache::Capacity() const { return capacity_; }

int64_t LRUCache::OriginalCapacity() const { return original_capacity_; }

int64_t LRUCache::RemainingCapacity() const { return capacity_ - used_capacity_; }

void LRUCache::Foreach(std::function<void(const ObjectID&)> f) {
for (auto& pair : item_list_) {
f(pair.first);
}
}

std::string LRUCache::DebugString() const {
std::stringstream result;
result << "\n(" << name_ << ") capacity: " << Capacity();
result << "\n(" << name_
<< ") used: " << 100. * (1. - (RemainingCapacity() / (double)OriginalCapacity()))
<< "%";
result << "\n(" << name_ << ") num objects: " << item_map_.size();
result << "\n(" << name_ << ") num evictions: " << num_evictions_total_;
result << "\n(" << name_ << ") bytes evicted: " << bytes_evicted_total_;
return result.str();
}

int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required,
Expand All @@ -45,11 +80,14 @@ int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required,
it--;
objects_to_evict->push_back(it->first);
bytes_evicted += it->second;
bytes_evicted_total_ += it->second;
num_evictions_total_ += 1;
}
return bytes_evicted;
}

EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info) : store_info_(store_info) {}
EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info, int64_t max_size)
: pinned_memory_bytes_(0), store_info_(store_info), cache_("global lru", max_size) {}

int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
std::vector<ObjectID>* objects_to_evict) {
Expand All @@ -62,11 +100,22 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
return bytes_evicted;
}

void EvictionPolicy::ObjectCreated(const ObjectID& object_id) {
auto entry = store_info_->objects[object_id].get();
cache_.Add(object_id, entry->data_size + entry->metadata_size);
void EvictionPolicy::ObjectCreated(const ObjectID& object_id, Client* client,
bool is_create) {
cache_.Add(object_id, GetObjectSize(object_id));
}

bool EvictionPolicy::SetClientQuota(Client* client, int64_t output_memory_quota) {
return false;
}

bool EvictionPolicy::EnforcePerClientQuota(Client* client, int64_t size, bool is_create,
std::vector<ObjectID>* objects_to_evict) {
return true;
}

void EvictionPolicy::ClientDisconnected(Client* client) {}

bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict) {
// Check if there is enough space to create the object.
int64_t required_space =
Expand All @@ -85,22 +134,29 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_t
return num_bytes_evicted >= required_space && num_bytes_evicted > 0;
}

void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id,
std::vector<ObjectID>* objects_to_evict) {
void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id) {
// If the object is in the LRU cache, remove it.
cache_.Remove(object_id);
pinned_memory_bytes_ += GetObjectSize(object_id);
}

void EvictionPolicy::EndObjectAccess(const ObjectID& object_id,
std::vector<ObjectID>* objects_to_evict) {
auto entry = store_info_->objects[object_id].get();
void EvictionPolicy::EndObjectAccess(const ObjectID& object_id) {
auto size = GetObjectSize(object_id);
// Add the object to the LRU cache.
cache_.Add(object_id, entry->data_size + entry->metadata_size);
cache_.Add(object_id, size);
pinned_memory_bytes_ -= size;
}

void EvictionPolicy::RemoveObject(const ObjectID& object_id) {
// If the object is in the LRU cache, remove it.
cache_.Remove(object_id);
}

int64_t EvictionPolicy::GetObjectSize(const ObjectID& object_id) const {
auto entry = store_info_->objects[object_id].get();
return entry->data_size + entry->metadata_size;
}

std::string EvictionPolicy::DebugString() const { return cache_.DebugString(); }

} // namespace plasma
Loading