From f3638b843fcfeeeb3a67cd977aa09ed9e53f3462 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 23 Jul 2019 17:35:15 -0700 Subject: [PATCH] fix the rebase --- cpp/src/plasma/CMakeLists.txt | 1 + cpp/src/plasma/client.cc | 36 ++++ cpp/src/plasma/client.h | 12 ++ cpp/src/plasma/eviction_policy.cc | 76 ++++++-- cpp/src/plasma/eviction_policy.h | 105 ++++++++++-- cpp/src/plasma/format/plasma.fbs | 30 +++- cpp/src/plasma/plasma.h | 20 ++- cpp/src/plasma/protocol.cc | 55 ++++++ cpp/src/plasma/protocol.h | 20 +++ cpp/src/plasma/quota_aware_policy.cc | 167 ++++++++++++++++++ cpp/src/plasma/quota_aware_policy.h | 90 ++++++++++ cpp/src/plasma/store.cc | 51 ++++-- cpp/src/plasma/store.h | 25 +-- cpp/src/plasma/test/client_tests.cc | 248 +++++++++++++++++++++++++++ python/pyarrow/_plasma.pyx | 18 ++ 15 files changed, 892 insertions(+), 62 deletions(-) create mode 100644 cpp/src/plasma/quota_aware_policy.cc create mode 100644 cpp/src/plasma/quota_aware_policy.h diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt index 4687ce9e65cd..3403ada29983 100644 --- a/cpp/src/plasma/CMakeLists.txt +++ b/cpp/src/plasma/CMakeLists.txt @@ -78,6 +78,7 @@ set(PLASMA_STORE_SRCS dlmalloc.cc events.cc eviction_policy.cc + quota_aware_policy.cc plasma_allocator.cc store.cc thirdparty/ae/ae.c) diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc index a6cdf7f17ca9..a7f1a37af46c 100644 --- a/cpp/src/plasma/client.cc +++ b/cpp/src/plasma/client.cc @@ -207,6 +207,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this* data, int device_num = 0); @@ -245,6 +247,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this guard(client_mutex_); + RETURN_NOT_OK(SendSetOptionsRequest(store_conn_, client_name, output_memory_quota)); + std::vector buffer; + RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType::PlasmaSetOptionsReply, &buffer)); + return ReadSetOptionsReply(buffer.data(), buffer.size()); +} + Status PlasmaClient::Impl::Disconnect() { std::lock_guard guard(client_mutex_); @@ -991,6 +1004,22 @@ Status PlasmaClient::Impl::Disconnect() { return Status::OK(); } +std::string PlasmaClient::Impl::DebugString() { + std::lock_guard guard(client_mutex_); + if (!SendGetDebugStringRequest(store_conn_).ok()) { + return "error sending request"; + } + std::vector buffer; + if (!PlasmaReceive(store_conn_, MessageType::PlasmaGetDebugStringReply, &buffer).ok()) { + return "error receiving reply"; + } + std::string debug_string; + if (!ReadGetDebugStringReply(buffer.data(), buffer.size(), &debug_string).ok()) { + return "error parsing reply"; + } + return debug_string; +} + // ---------------------------------------------------------------------- // PlasmaClient @@ -1005,6 +1034,11 @@ Status PlasmaClient::Connect(const std::string& store_socket_name, num_retries); } +Status PlasmaClient::SetClientOptions(const std::string& client_name, + int64_t output_memory_quota) { + return impl_->SetClientOptions(client_name, output_memory_quota); +} + Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size, const uint8_t* metadata, int64_t metadata_size, std::shared_ptr* data, int device_num) { @@ -1070,6 +1104,8 @@ Status PlasmaClient::DecodeNotification(const uint8_t* buffer, ObjectID* object_ Status PlasmaClient::Disconnect() { return impl_->Disconnect(); } +std::string PlasmaClient::DebugString() { return impl_->DebugString(); } + bool PlasmaClient::IsInUse(const ObjectID& object_id) { return impl_->IsInUse(object_id); } diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h index facfd37ca788..766fc56e8e4c 100644 --- a/cpp/src/plasma/client.h +++ b/cpp/src/plasma/client.h @@ -65,6 +65,13 @@ class ARROW_EXPORT PlasmaClient { const std::string& manager_socket_name = "", int release_delay = 0, int num_retries = -1); + /// Set runtime options for this client. + /// + /// \param client_name The name of the client, used in debug messages. + /// \param output_memory_quota The memory quota in bytes for objects created by + /// this client. + Status SetClientOptions(const std::string& client_name, int64_t output_memory_quota); + /// Create an object in the Plasma Store. Any metadata for this object must be /// be passed in when the object is created. /// @@ -250,6 +257,11 @@ class ARROW_EXPORT PlasmaClient { /// \return The return status. Status Disconnect(); + /// Get the current debug string from the plasma store server. + /// + /// \return The debug string. + std::string DebugString(); + /// Get the memory capacity of the store. /// /// \return Memory capacity of the store in bytes. diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc index da5df5a36ddd..a9cbb89582cf 100644 --- a/cpp/src/plasma/eviction_policy.cc +++ b/cpp/src/plasma/eviction_policy.cc @@ -19,6 +19,7 @@ #include "plasma/plasma_allocator.h" #include +#include namespace plasma { @@ -28,13 +29,47 @@ void LRUCache::Add(const ObjectID& key, int64_t size) { // Note that it is important to use a list so the iterators stay valid. item_list_.emplace_front(key, size); item_map_.emplace(key, item_list_.begin()); + used_capacity_ += size; } void LRUCache::Remove(const ObjectID& key) { auto it = item_map_.find(key); ARROW_CHECK(it != item_map_.end()); + used_capacity_ -= it->second->second; item_list_.erase(it->second); item_map_.erase(it); + ARROW_CHECK(used_capacity_ >= 0) << DebugString(); +} + +void LRUCache::AdjustCapacity(int64_t delta) { + ARROW_LOG(INFO) << "adjusting global lru capacity from " << Capacity() << " to " + << (Capacity() + delta) << " (max " << OriginalCapacity() << ")"; + capacity_ += delta; + ARROW_CHECK(used_capacity_ >= 0) << DebugString(); +} + +int64_t LRUCache::Capacity() const { return capacity_; } + +int64_t LRUCache::OriginalCapacity() const { return original_capacity_; } + +int64_t LRUCache::RemainingCapacity() const { return capacity_ - used_capacity_; } + +void LRUCache::Foreach(std::function f) { + for (auto& pair : item_list_) { + f(pair.first); + } +} + +std::string LRUCache::DebugString() const { + std::stringstream result; + result << "\n(" << name_ << ") capacity: " << Capacity(); + result << "\n(" << name_ + << ") used: " << 100. * (1. - (RemainingCapacity() / (double)OriginalCapacity())) + << "%"; + result << "\n(" << name_ << ") num objects: " << item_map_.size(); + result << "\n(" << name_ << ") num evictions: " << num_evictions_total_; + result << "\n(" << name_ << ") bytes evicted: " << bytes_evicted_total_; + return result.str(); } int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required, @@ -45,11 +80,14 @@ int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required, it--; objects_to_evict->push_back(it->first); bytes_evicted += it->second; + bytes_evicted_total_ += it->second; + num_evictions_total_ += 1; } return bytes_evicted; } -EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info) : store_info_(store_info) {} +EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info, int64_t max_size) + : pinned_memory_bytes_(0), store_info_(store_info), cache_("global lru", max_size) {} int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required, std::vector* objects_to_evict) { @@ -62,11 +100,22 @@ int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required, return bytes_evicted; } -void EvictionPolicy::ObjectCreated(const ObjectID& object_id) { - auto entry = store_info_->objects[object_id].get(); - cache_.Add(object_id, entry->data_size + entry->metadata_size); +void EvictionPolicy::ObjectCreated(const ObjectID& object_id, Client* client, + bool is_create) { + cache_.Add(object_id, GetObjectSize(object_id)); } +bool EvictionPolicy::SetClientQuota(Client* client, int64_t output_memory_quota) { + return false; +} + +bool EvictionPolicy::EnforcePerClientQuota(Client* client, int64_t size, bool is_create, + std::vector* objects_to_evict) { + return true; +} + +void EvictionPolicy::ClientDisconnected(Client* client) {} + bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_to_evict) { // Check if there is enough space to create the object. int64_t required_space = @@ -85,17 +134,17 @@ bool EvictionPolicy::RequireSpace(int64_t size, std::vector* objects_t return num_bytes_evicted >= required_space && num_bytes_evicted > 0; } -void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id, - std::vector* objects_to_evict) { +void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id) { // If the object is in the LRU cache, remove it. cache_.Remove(object_id); + pinned_memory_bytes_ += GetObjectSize(object_id); } -void EvictionPolicy::EndObjectAccess(const ObjectID& object_id, - std::vector* objects_to_evict) { - auto entry = store_info_->objects[object_id].get(); +void EvictionPolicy::EndObjectAccess(const ObjectID& object_id) { + auto size = GetObjectSize(object_id); // Add the object to the LRU cache. - cache_.Add(object_id, entry->data_size + entry->metadata_size); + cache_.Add(object_id, size); + pinned_memory_bytes_ -= size; } void EvictionPolicy::RemoveObject(const ObjectID& object_id) { @@ -103,4 +152,11 @@ void EvictionPolicy::RemoveObject(const ObjectID& object_id) { cache_.Remove(object_id); } +int64_t EvictionPolicy::GetObjectSize(const ObjectID& object_id) const { + auto entry = store_info_->objects[object_id].get(); + return entry->data_size + entry->metadata_size; +} + +std::string EvictionPolicy::DebugString() const { return cache_.DebugString(); } + } // namespace plasma diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h index 68342ae102f3..d4fabb9b5f0b 100644 --- a/cpp/src/plasma/eviction_policy.h +++ b/cpp/src/plasma/eviction_policy.h @@ -18,7 +18,9 @@ #ifndef PLASMA_EVICTION_POLICY_H #define PLASMA_EVICTION_POLICY_H +#include #include +#include #include #include #include @@ -33,10 +35,18 @@ namespace plasma { // This file contains declaration for all functions and data structures that // need to be provided if you want to implement a new eviction algorithm for the // Plasma store. +// +// It does not implement memory quotas; see quota_aware_policy for that. class LRUCache { public: - LRUCache() {} + LRUCache(const std::string& name, int64_t size) + : name_(name), + original_capacity_(size), + capacity_(size), + used_capacity_(0), + num_evictions_total_(0), + bytes_evicted_total_(0) {} void Add(const ObjectID& key, int64_t size); @@ -45,6 +55,18 @@ class LRUCache { int64_t ChooseObjectsToEvict(int64_t num_bytes_required, std::vector* objects_to_evict); + int64_t OriginalCapacity() const; + + int64_t Capacity() const; + + int64_t RemainingCapacity() const; + + void AdjustCapacity(int64_t delta); + + void Foreach(std::function); + + std::string DebugString() const; + private: /// A doubly-linked list containing the items in the cache and /// their sizes in LRU order. @@ -53,6 +75,19 @@ class LRUCache { /// A hash table mapping the object ID of an object in the cache to its /// location in the doubly linked list item_list_. std::unordered_map item_map_; + + /// The name of this cache, used for debugging purposes only. + const std::string name_; + /// The original (max) capacity of this cache in bytes. + const int64_t original_capacity_; + /// The current capacity, which must be <= the original capacity. + int64_t capacity_; + /// The number of bytes used of the available capacity. + int64_t used_capacity_; + /// The number of objects evicted from this cache. + int64_t num_evictions_total_; + /// The number of bytes evicted from this cache. + int64_t bytes_evicted_total_; }; /// The eviction policy. @@ -62,7 +97,11 @@ class EvictionPolicy { /// /// @param store_info Information about the Plasma store that is exposed /// to the eviction policy. - explicit EvictionPolicy(PlasmaStoreInfo* store_info); + /// @param max_size Max size in bytes total of objects to store. + explicit EvictionPolicy(PlasmaStoreInfo* store_info, int64_t max_size); + + /// Destroy an eviction policy. + virtual ~EvictionPolicy() {} /// This method will be called whenever an object is first created in order to /// add it to the LRU cache. This is done so that the first time, the Plasma @@ -70,7 +109,38 @@ class EvictionPolicy { /// cache. /// /// @param object_id The object ID of the object that was created. - void ObjectCreated(const ObjectID& object_id); + /// @param client The pointer to the client. + /// @param is_create Whether we are creating a new object (vs reading an object). + virtual void ObjectCreated(const ObjectID& object_id, Client* client, bool is_create); + + /// Set quota for a client. + /// + /// @param client The pointer to the client. + /// @param output_memory_quota Set the quota for this client. This can only be + /// called once per client. This is effectively the equivalent of giving + /// the client its own LRU cache instance. The memory for this is taken + /// out of the capacity of the global LRU cache for the client lifetime. + /// + /// @return True if enough space can be reserved for the given client quota. + virtual bool SetClientQuota(Client* client, int64_t output_memory_quota); + + /// Determine what objects need to be evicted to enforce the given client's quota. + /// + /// @param client The pointer to the client creating the object. + /// @param size The size of the object to create. + /// @param is_create Whether we are creating a new object (vs reading an object). + /// @param objects_to_evict The object IDs that were chosen for eviction will + /// be stored into this vector. + /// + /// @return True if enough space could be freed and false otherwise. + virtual bool EnforcePerClientQuota(Client* client, int64_t size, bool is_create, + std::vector* objects_to_evict); + + /// Called to clean up any resources allocated by this client. This merges any + /// per-client LRU queue created by SetClientQuota into the global LRU queue. + /// + /// @param client The pointer to the client. + virtual void ClientDisconnected(Client* client); /// This method will be called when the Plasma store needs more space, perhaps /// to create a new object. When this method is called, the eviction @@ -82,7 +152,7 @@ class EvictionPolicy { /// @param objects_to_evict The object IDs that were chosen for eviction will /// be stored into this vector. /// @return True if enough space can be freed and false otherwise. - bool RequireSpace(int64_t size, std::vector* objects_to_evict); + virtual bool RequireSpace(int64_t size, std::vector* objects_to_evict); /// This method will be called whenever an unused object in the Plasma store /// starts to be used. When this method is called, the eviction policy will @@ -90,10 +160,7 @@ class EvictionPolicy { /// the Plasma store by the caller. /// /// @param object_id The ID of the object that is now being used. - /// @param objects_to_evict The object IDs that were chosen for eviction will - /// be stored into this vector. - void BeginObjectAccess(const ObjectID& object_id, - std::vector* objects_to_evict); + virtual void BeginObjectAccess(const ObjectID& object_id); /// This method will be called whenever an object in the Plasma store that was /// being used is no longer being used. When this method is called, the @@ -101,10 +168,7 @@ class EvictionPolicy { /// fact be evicted from the Plasma store by the caller. /// /// @param object_id The ID of the object that is no longer being used. - /// @param objects_to_evict The object IDs that were chosen for eviction will - /// be stored into this vector. - void EndObjectAccess(const ObjectID& object_id, - std::vector* objects_to_evict); + virtual void EndObjectAccess(const ObjectID& object_id); /// Choose some objects to evict from the Plasma store. When this method is /// called, the eviction policy will assume that the objects chosen to be @@ -117,15 +181,24 @@ class EvictionPolicy { /// @param objects_to_evict The object IDs that were chosen for eviction will /// be stored into this vector. /// @return The total number of bytes of space chosen to be evicted. - int64_t ChooseObjectsToEvict(int64_t num_bytes_required, - std::vector* objects_to_evict); + virtual int64_t ChooseObjectsToEvict(int64_t num_bytes_required, + std::vector* objects_to_evict); /// This method will be called when an object is going to be removed /// /// @param object_id The ID of the object that is now being used. - void RemoveObject(const ObjectID& object_id); + virtual void RemoveObject(const ObjectID& object_id); + + /// Returns debugging information for this eviction policy. + virtual std::string DebugString() const; + + protected: + /// Returns the size of the object + int64_t GetObjectSize(const ObjectID& object_id) const; + + /// The number of bytes pinned by applications. + int64_t pinned_memory_bytes_; - private: /// Pointer to the plasma store info. PlasmaStoreInfo* store_info_; /// Datastructure for the LRU cache. diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs index b3c890391887..74b26bf600c8 100644 --- a/cpp/src/plasma/format/plasma.fbs +++ b/cpp/src/plasma/format/plasma.fbs @@ -67,7 +67,13 @@ enum MessageType:long { // reply messages get sent. Each one contains a fixed number of bytes. PlasmaDataReply, // Object notifications. - PlasmaNotification + PlasmaNotification, + // Set memory quota for a client. + PlasmaSetOptionsRequest, + PlasmaSetOptionsReply, + // Get debugging information from the store. + PlasmaGetDebugStringRequest, + PlasmaGetDebugStringReply, } enum PlasmaError:int { @@ -82,7 +88,7 @@ enum PlasmaError:int { // Trying to delete an object but it's not sealed. ObjectNotSealed, // Trying to delete an object but it's in use. - ObjectInUse + ObjectInUse, } // Plasma store messages @@ -103,6 +109,26 @@ struct PlasmaObjectSpec { device_num: int; } +table PlasmaSetOptionsRequest { + // The name of the client. + client_name: string; + // The size of the output memory limit in bytes. + output_memory_quota: long; +} + +table PlasmaSetOptionsReply { + // Whether setting options succeeded. + error: PlasmaError; +} + +table PlasmaGetDebugStringRequest { +} + +table PlasmaGetDebugStringReply { + // The debug string from the server. + debug_string: string; +} + table PlasmaCreateRequest { // ID of the object to be created. object_id: string; diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h index 0948e6de8e99..6ebb68774c38 100644 --- a/cpp/src/plasma/plasma.h +++ b/cpp/src/plasma/plasma.h @@ -69,7 +69,25 @@ struct ObjectInfoT; /// Allocation granularity used in plasma for object allocation. constexpr int64_t kBlockSize = 64; -struct Client; +/// Contains all information that is associated with a Plasma store client. +struct Client { + explicit Client(int fd); + + /// The file descriptor used to communicate with the client. + int fd; + + /// Object ids that are used by this client. + std::unordered_set object_ids; + + /// File descriptors that are used by this client. + std::unordered_set used_fds; + + /// The file descriptor used to push notifications to client. This is only valid + /// if client subscribes to plasma store. -1 indicates invalid. + int notification_fd; + + std::string name = "anonymous_client"; +}; // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec. struct PlasmaObject { diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc index c22d77d60190..65c57a66f359 100644 --- a/cpp/src/plasma/protocol.cc +++ b/cpp/src/plasma/protocol.cc @@ -100,6 +100,61 @@ Status PlasmaErrorStatus(fb::PlasmaError plasma_error) { return Status::OK(); } +// Set options messages. + +Status SendSetOptionsRequest(int sock, const std::string& client_name, + int64_t output_memory_limit) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaSetOptionsRequest(fbb, fbb.CreateString(client_name), + output_memory_limit); + return PlasmaSend(sock, MessageType::PlasmaSetOptionsRequest, &fbb, message); +} + +Status ReadSetOptionsRequest(uint8_t* data, size_t size, std::string* client_name, + int64_t* output_memory_quota) { + DCHECK(data); + auto message = flatbuffers::GetRoot(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *client_name = std::string(message->client_name()->str()); + *output_memory_quota = message->output_memory_quota(); + return Status::OK(); +} + +Status SendSetOptionsReply(int sock, PlasmaError error) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaSetOptionsReply(fbb, error); + return PlasmaSend(sock, MessageType::PlasmaSetOptionsReply, &fbb, message); +} + +Status ReadSetOptionsReply(uint8_t* data, size_t size) { + DCHECK(data); + auto message = flatbuffers::GetRoot(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + return PlasmaErrorStatus(message->error()); +} + +// Get debug string messages. + +Status SendGetDebugStringRequest(int sock) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaGetDebugStringRequest(fbb); + return PlasmaSend(sock, MessageType::PlasmaGetDebugStringRequest, &fbb, message); +} + +Status SendGetDebugStringReply(int sock, const std::string& debug_string) { + flatbuffers::FlatBufferBuilder fbb; + auto message = fb::CreatePlasmaGetDebugStringReply(fbb, fbb.CreateString(debug_string)); + return PlasmaSend(sock, MessageType::PlasmaGetDebugStringReply, &fbb, message); +} + +Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_string) { + DCHECK(data); + auto message = flatbuffers::GetRoot(data); + DCHECK(VerifyFlatbuffer(message, data, size)); + *debug_string = message->debug_string()->str(); + return Status::OK(); +} + // Create messages. Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h index 0362bd47797d..481eadd3a00e 100644 --- a/cpp/src/plasma/protocol.h +++ b/cpp/src/plasma/protocol.h @@ -44,6 +44,26 @@ bool VerifyFlatbuffer(T* object, uint8_t* data, size_t size) { Status PlasmaReceive(int sock, MessageType message_type, std::vector* buffer); +/* Set options messages. */ + +Status SendSetOptionsRequest(int sock, const std::string& client_name, + int64_t output_memory_limit); + +Status ReadSetOptionsRequest(uint8_t* data, size_t size, std::string* client_name, + int64_t* output_memory_quota); + +Status SendSetOptionsReply(int sock, PlasmaError error); + +Status ReadSetOptionsReply(uint8_t* data, size_t size); + +/* Debug string messages. */ + +Status SendGetDebugStringRequest(int sock); + +Status SendGetDebugStringReply(int sock, const std::string& debug_string); + +Status ReadGetDebugStringReply(uint8_t* data, size_t size, std::string* debug_string); + /* Plasma Create message functions. */ Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, diff --git a/cpp/src/plasma/quota_aware_policy.cc b/cpp/src/plasma/quota_aware_policy.cc new file mode 100644 index 000000000000..57f39d50b36b --- /dev/null +++ b/cpp/src/plasma/quota_aware_policy.cc @@ -0,0 +1,167 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "plasma/quota_aware_policy.h" +#include "plasma/common.h" +#include "plasma/plasma_allocator.h" + +#include +#include +#include + +namespace plasma { + +QuotaAwarePolicy::QuotaAwarePolicy(PlasmaStoreInfo* store_info, int64_t max_size) + : EvictionPolicy(store_info, max_size) {} + +bool QuotaAwarePolicy::HasQuota(Client* client, bool is_create) { + if (!is_create) { + return false; // no quota enforcement on read requests yet + } + return per_client_cache_.find(client) != per_client_cache_.end(); +} + +void QuotaAwarePolicy::ObjectCreated(const ObjectID& object_id, Client* client, + bool is_create) { + if (HasQuota(client, is_create)) { + per_client_cache_[client]->Add(object_id, GetObjectSize(object_id)); + owned_by_client_[object_id] = client; + } else { + EvictionPolicy::ObjectCreated(object_id, client, is_create); + } +} + +bool QuotaAwarePolicy::SetClientQuota(Client* client, int64_t output_memory_quota) { + if (per_client_cache_.find(client) != per_client_cache_.end()) { + ARROW_LOG(WARNING) << "Cannot change the client quota once set"; + return false; + } + + if (cache_.Capacity() - output_memory_quota < + cache_.OriginalCapacity() * kGlobalLruReserveFraction) { + ARROW_LOG(WARNING) << "Not enough memory to set client quota: " << DebugString(); + return false; + } + + // those objects will be lazily evicted on the next call + cache_.AdjustCapacity(-output_memory_quota); + per_client_cache_[client] = + std::unique_ptr(new LRUCache(client->name, output_memory_quota)); + return true; +} + +bool QuotaAwarePolicy::EnforcePerClientQuota(Client* client, int64_t size, bool is_create, + std::vector* objects_to_evict) { + if (!HasQuota(client, is_create)) { + return true; + } + + auto& client_cache = per_client_cache_[client]; + if (size > client_cache->Capacity()) { + ARROW_LOG(WARNING) << "object too large (" << size + << " bytes) to fit in client quota " << client_cache->Capacity() + << " " << DebugString(); + return false; + } + + if (client_cache->RemainingCapacity() >= size) { + return true; + } + + int64_t space_to_free = size - client_cache->RemainingCapacity(); + if (space_to_free > 0) { + std::vector candidates; + client_cache->ChooseObjectsToEvict(space_to_free, &candidates); + for (ObjectID& object_id : candidates) { + if (shared_for_read_.count(object_id)) { + // Pinned so we can't evict it, so demote the object to global LRU instead. + // We an do this by simply removing it from all data structures, so that + // the next EndObjectAccess() will add it back to global LRU. + shared_for_read_.erase(object_id); + } else { + objects_to_evict->push_back(object_id); + } + owned_by_client_.erase(object_id); + client_cache->Remove(object_id); + } + } + return true; +} + +void QuotaAwarePolicy::BeginObjectAccess(const ObjectID& object_id) { + if (owned_by_client_.find(object_id) != owned_by_client_.end()) { + shared_for_read_.insert(object_id); + pinned_memory_bytes_ += GetObjectSize(object_id); + return; + } + EvictionPolicy::BeginObjectAccess(object_id); +} + +void QuotaAwarePolicy::EndObjectAccess(const ObjectID& object_id) { + if (owned_by_client_.find(object_id) != owned_by_client_.end()) { + shared_for_read_.erase(object_id); + pinned_memory_bytes_ -= GetObjectSize(object_id); + return; + } + EvictionPolicy::EndObjectAccess(object_id); +} + +void QuotaAwarePolicy::RemoveObject(const ObjectID& object_id) { + if (owned_by_client_.find(object_id) != owned_by_client_.end()) { + per_client_cache_[owned_by_client_[object_id]]->Remove(object_id); + owned_by_client_.erase(object_id); + shared_for_read_.erase(object_id); + return; + } + EvictionPolicy::RemoveObject(object_id); +} + +void QuotaAwarePolicy::ClientDisconnected(Client* client) { + if (per_client_cache_.find(client) == per_client_cache_.end()) { + return; + } + // return capacity back to global LRU + cache_.AdjustCapacity(per_client_cache_[client]->Capacity()); + // clean up any entries used to track this client's quota usage + per_client_cache_[client]->Foreach([this](const ObjectID& obj) { + if (!shared_for_read_.count(obj)) { + // only add it to the global LRU if we have it in pinned mode + // otherwise, EndObjectAccess will add it later + cache_.Add(obj, GetObjectSize(obj)); + } + owned_by_client_.erase(obj); + shared_for_read_.erase(obj); + }); + per_client_cache_.erase(client); +} + +std::string QuotaAwarePolicy::DebugString() const { + std::stringstream result; + result << "num clients with quota: " << per_client_cache_.size(); + result << "\nquota map size: " << owned_by_client_.size(); + result << "\npinned quota map size: " << shared_for_read_.size(); + result << "\nallocated bytes: " << PlasmaAllocator::Allocated(); + result << "\nallocation limit: " << PlasmaAllocator::GetFootprintLimit(); + result << "\npinned bytes: " << pinned_memory_bytes_; + result << cache_.DebugString(); + for (const auto& pair : per_client_cache_) { + result << pair.second->DebugString(); + } + return result.str(); +} + +} // namespace plasma diff --git a/cpp/src/plasma/quota_aware_policy.h b/cpp/src/plasma/quota_aware_policy.h new file mode 100644 index 000000000000..d92d5623d743 --- /dev/null +++ b/cpp/src/plasma/quota_aware_policy.h @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PLASMA_QUOTA_AWARE_POLICY_H +#define PLASMA_QUOTA_AWARE_POLICY_H + +#include +#include +#include +#include +#include +#include +#include + +#include "plasma/common.h" +#include "plasma/eviction_policy.h" +#include "plasma/plasma.h" + +namespace plasma { + +/// Reserve this fraction of memory for shared usage. Attempts to set client +/// quotas that would cause the global LRU memory fraction to fall below this +/// value will be rejected. +constexpr double kGlobalLruReserveFraction = 0.3; + +/// Extends the basic eviction policy to implement per-client memory quotas. +/// This effectively gives each client its own LRU queue, which caps its +/// memory usage and protects this memory from being evicted by other clients. +/// +/// The quotas are enforced when objects are first created, by evicting the +/// necessary number of objects from the client's own LRU queue to cap its +/// memory usage. Once that is done, allocation is handled by the normal +/// eviction policy. This may result in the eviction of objects from the +/// global LRU queue, if not enough memory can be allocated even after the +/// evictions from the client's own LRU queue. +/// +/// Some special cases: +/// - When a pinned object is "evicted" from a per-client queue, it is +/// instead transferred into the global LRU queue. +/// - When a client disconnects, its LRU queue is merged into the head of the +/// global LRU queue. +class QuotaAwarePolicy : public EvictionPolicy { + public: + /// Construct a quota-aware eviction policy. + /// + /// @param store_info Information about the Plasma store that is exposed + /// to the eviction policy. + /// @param max_size Max size in bytes total of objects to store. + explicit QuotaAwarePolicy(PlasmaStoreInfo* store_info, int64_t max_size); + void ObjectCreated(const ObjectID& object_id, Client* client, bool is_create) override; + bool SetClientQuota(Client* client, int64_t output_memory_quota) override; + bool EnforcePerClientQuota(Client* client, int64_t size, bool is_create, + std::vector* objects_to_evict) override; + void ClientDisconnected(Client* client) override; + void BeginObjectAccess(const ObjectID& object_id) override; + void EndObjectAccess(const ObjectID& object_id) override; + void RemoveObject(const ObjectID& object_id) override; + std::string DebugString() const override; + + private: + /// Returns whether we are enforcing memory quotas for an operation. + bool HasQuota(Client* client, bool is_create); + + /// Per-client LRU caches, if quota is enabled. + std::unordered_map> per_client_cache_; + /// Tracks which client created which object. This only applies to clients + /// that have a memory quota set. + std::unordered_map owned_by_client_; + /// Tracks which objects are mapped for read and hence can't be evicted. + /// However these objects are still tracked within the client caches. + std::unordered_set shared_for_read_; +}; + +} // namespace plasma + +#endif // PLASMA_EVICTION_POLICY_H diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc index 2c3361e3de76..dbcd3df7d036 100644 --- a/cpp/src/plasma/store.cc +++ b/cpp/src/plasma/store.cc @@ -113,7 +113,9 @@ Client::Client(int fd) : fd(fd), notification_fd(-1) {} PlasmaStore::PlasmaStore(EventLoop* loop, std::string directory, bool hugepages_enabled, const std::string& socket_name, std::shared_ptr external_store) - : loop_(loop), eviction_policy_(&store_info_), external_store_(external_store) { + : loop_(loop), + eviction_policy_(&store_info_, PlasmaAllocator::GetFootprintLimit()), + external_store_(external_store) { store_info_.directory = directory; store_info_.hugepages_enabled = hugepages_enabled; #ifdef PLASMA_CUDA @@ -138,9 +140,7 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt // that the object is being used. if (entry->ref_count == 0) { // Tell the eviction policy that this object is being used. - std::vector objects_to_evict; - eviction_policy_.BeginObjectAccess(object_id, &objects_to_evict); - EvictObjects(objects_to_evict); + eviction_policy_.BeginObjectAccess(object_id); } // Increase reference count. entry->ref_count++; @@ -151,7 +151,16 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt // Allocate memory uint8_t* PlasmaStore::AllocateMemory(size_t size, int* fd, int64_t* map_size, - ptrdiff_t* offset) { + ptrdiff_t* offset, Client* client, bool is_create) { + // First free up space from the client's LRU queue if quota enforcement is on. + std::vector client_objects_to_evict; + bool quota_ok = eviction_policy_.EnforcePerClientQuota(client, size, is_create, + &client_objects_to_evict); + if (!quota_ok) { + return nullptr; + } + EvictObjects(client_objects_to_evict); + // Try to evict objects until there is enough space. uint8_t* pointer = nullptr; while (true) { @@ -223,7 +232,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si auto total_size = data_size + metadata_size; if (device_num == 0) { - pointer = AllocateMemory(total_size, &fd, &map_size, &offset); + pointer = AllocateMemory(total_size, &fd, &map_size, &offset, client, true); if (!pointer) { ARROW_LOG(ERROR) << "Not enough memory to create the object " << object_id.hex() << ", data_size=" << data_size @@ -274,7 +283,7 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si // Notify the eviction policy that this object was created. This must be done // immediately before the call to AddToClientObjectIds so that the // eviction policy does not have an opportunity to evict the object. - eviction_policy_.ObjectCreated(object_id); + eviction_policy_.ObjectCreated(object_id, client, true); // Record that this client is using this object. AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); return PlasmaError::OK; @@ -447,11 +456,11 @@ void PlasmaStore::ProcessGetRequest(Client* client, ARROW_CHECK(!entry->pointer); entry->pointer = AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd, - &entry->map_size, &entry->offset); + &entry->map_size, &entry->offset, client, false); if (entry->pointer) { entry->state = ObjectState::PLASMA_CREATED; entry->create_time = std::time(nullptr); - eviction_policy_.ObjectCreated(object_id); + eviction_policy_.ObjectCreated(object_id, client, false); AddToClientObjectIds(object_id, store_info_.objects[object_id].get(), client); evicted_ids.push_back(object_id); evicted_entries.push_back(entry); @@ -525,9 +534,7 @@ int PlasmaStore::RemoveFromClientObjectIds(const ObjectID& object_id, if (entry->ref_count == 0) { if (deletion_cache_.count(object_id) == 0) { // Tell the eviction policy that this object is no longer being used. - std::vector objects_to_evict; - eviction_policy_.EndObjectAccess(object_id, &objects_to_evict); - EvictObjects(objects_to_evict); + eviction_policy_.EndObjectAccess(object_id); } else { // Above code does not really delete an object. Instead, it just put an // object to LRU cache which will be cleaned when the memory is not enough. @@ -651,6 +658,10 @@ PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) { } void PlasmaStore::EvictObjects(const std::vector& object_ids) { + if (object_ids.size() == 0) { + return; + } + std::vector> evicted_object_data; std::vector evicted_entries; for (const auto& object_id : object_ids) { @@ -721,6 +732,7 @@ void PlasmaStore::DisconnectClient(int client_fd) { ARROW_LOG(INFO) << "Disconnecting client on fd " << client_fd; // Release all the objects that the client was using. auto client = it->second.get(); + eviction_policy_.ClientDisconnected(client); std::unordered_map sealed_objects; for (const auto& object_id : client->object_ids) { auto it = store_info_.objects.find(object_id); @@ -1011,6 +1023,21 @@ Status PlasmaStore::ProcessMessage(Client* client) { ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd; DisconnectClient(client->fd); break; + case fb::MessageType::PlasmaSetOptionsRequest: { + std::string client_name; + int64_t output_memory_quota; + RETURN_NOT_OK( + ReadSetOptionsRequest(input, input_size, &client_name, &output_memory_quota)); + client->name = client_name; + bool success = eviction_policy_.SetClientQuota(client, output_memory_quota); + HANDLE_SIGPIPE(SendSetOptionsReply(client->fd, success ? PlasmaError::OK + : PlasmaError::OutOfMemory), + client->fd); + } break; + case fb::MessageType::PlasmaGetDebugStringRequest: { + HANDLE_SIGPIPE(SendGetDebugStringReply(client->fd, eviction_policy_.DebugString()), + client->fd); + } break; default: // This code should be unreachable. ARROW_CHECK(0); diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h index 26b49f02e72b..53564fbc7ba1 100644 --- a/cpp/src/plasma/store.h +++ b/cpp/src/plasma/store.h @@ -27,10 +27,10 @@ #include "plasma/common.h" #include "plasma/events.h" -#include "plasma/eviction_policy.h" #include "plasma/external_store.h" #include "plasma/plasma.h" #include "plasma/protocol.h" +#include "plasma/quota_aware_policy.h" namespace arrow { class Status; @@ -54,24 +54,6 @@ struct NotificationQueue { std::deque> object_notifications; }; -/// Contains all information that is associated with a Plasma store client. -struct Client { - explicit Client(int fd); - - /// The file descriptor used to communicate with the client. - int fd; - - /// Object ids that are used by this client. - std::unordered_set object_ids; - - /// File descriptors that are used by this client. - std::unordered_set used_fds; - - /// The file descriptor used to push notifications to client. This is only valid - /// if client subscribes to plasma store. -1 indicates invalid. - int notification_fd; -}; - class PlasmaStore { public: using NotificationMap = std::unordered_map; @@ -214,7 +196,8 @@ class PlasmaStore { void EraseFromObjectTable(const ObjectID& object_id); - uint8_t* AllocateMemory(size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset); + uint8_t* AllocateMemory(size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset, + Client* client, bool is_create); #ifdef PLASMA_CUDA Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer, std::shared_ptr* out_ipc_handle); @@ -228,7 +211,7 @@ class PlasmaStore { /// to the eviction policy. PlasmaStoreInfo store_info_; /// The state that is managed by the eviction policy. - EvictionPolicy eviction_policy_; + QuotaAwarePolicy eviction_policy_; /// Input buffer. This is allocated only once to avoid mallocs for every /// call to process_message. std::vector input_buffer_; diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc index deffde57976a..2bb7dc778a47 100644 --- a/cpp/src/plasma/test/client_tests.cc +++ b/cpp/src/plasma/test/client_tests.cc @@ -169,6 +169,254 @@ TEST_F(TestPlasmaStore, SealErrorsTest) { ARROW_CHECK_OK(client_.Release(object_id)); } +TEST_F(TestPlasmaStore, SetQuotaBasicTest) { + bool has_object = false; + ObjectID id1 = random_object_id(); + ObjectID id2 = random_object_id(); + + ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); + std::vector big_data(3 * 1024 * 1024, 0); + + // First object fits + CreateObject(client_, id1, {42}, big_data, true); + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); + + // Evicts first object + CreateObject(client_, id2, {42}, big_data, true); + ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_TRUE(has_object); + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_FALSE(has_object); + + // Too big to fit in quota at all + std::shared_ptr data_buffer; + ASSERT_FALSE( + client_.Create(random_object_id(), 7 * 1024 * 1024, {}, 0, &data_buffer).ok()); + ASSERT_TRUE( + client_.Create(random_object_id(), 4 * 1024 * 1024, {}, 0, &data_buffer).ok()); +} + +TEST_F(TestPlasmaStore, SetQuotaProvidesIsolationFromOtherClients) { + bool has_object = false; + ObjectID id1 = random_object_id(); + ObjectID id2 = random_object_id(); + + std::vector big_data(3 * 1024 * 1024, 0); + + // First object, created without quota + CreateObject(client_, id1, {42}, big_data, true); + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); + + // Second client creates a bunch of objects + for (int i = 0; i < 10; i++) { + CreateObject(client2_, random_object_id(), {42}, big_data, true); + } + + // First client's object is evicted + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_FALSE(has_object); + + // Try again with quota enabled + ARROW_CHECK_OK(client_.SetClientOptions("client1", 5 * 1024 * 1024)); + CreateObject(client_, id2, {42}, big_data, true); + ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_TRUE(has_object); + + // Second client creates a bunch of objects + for (int i = 0; i < 10; i++) { + CreateObject(client2_, random_object_id(), {42}, big_data, true); + } + + // First client's object is not evicted + ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_TRUE(has_object); +} + +TEST_F(TestPlasmaStore, SetQuotaProtectsOtherClients) { + bool has_object = false; + ObjectID id1 = random_object_id(); + + std::vector big_data(3 * 1024 * 1024, 0); + + // First client has no quota + CreateObject(client_, id1, {42}, big_data, true); + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); + + // Second client creates a bunch of objects under a quota + ARROW_CHECK_OK(client2_.SetClientOptions("client2", 5 * 1024 * 1024)); + for (int i = 0; i < 10; i++) { + CreateObject(client2_, random_object_id(), {42}, big_data, true); + } + + // First client's object is NOT evicted + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); +} + +TEST_F(TestPlasmaStore, SetQuotaCannotExceedSeventyPercentMemory) { + ASSERT_FALSE(client_.SetClientOptions("client1", 8 * 1024 * 1024).ok()); + ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok()); + // cannot set quota twice + ASSERT_FALSE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok()); + // cannot exceed 70% summed + ASSERT_FALSE(client2_.SetClientOptions("client2", 3 * 1024 * 1024).ok()); + ASSERT_TRUE(client2_.SetClientOptions("client2", 1 * 1024 * 1024).ok()); +} + +TEST_F(TestPlasmaStore, SetQuotaDemotesPinnedObjectsToGlobalLRU) { + bool has_object = false; + ASSERT_TRUE(client_.SetClientOptions("client1", 5 * 1024 * 1024).ok()); + + ObjectID id1 = random_object_id(); + ObjectID id2 = random_object_id(); + std::vector big_data(3 * 1024 * 1024, 0); + + // Quota is not enough to fit both id1 and id2, but global LRU is + CreateObject(client_, id1, {42}, big_data, false); + CreateObject(client_, id2, {42}, big_data, false); + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); + ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_TRUE(has_object); + + // Release both objects. Now id1 is in global LRU and id2 is in quota + ARROW_CHECK_OK(client_.Release(id1)); + ARROW_CHECK_OK(client_.Release(id2)); + + // This flushes id1 from the object store + for (int i = 0; i < 10; i++) { + CreateObject(client2_, random_object_id(), {42}, big_data, true); + } + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_FALSE(has_object); + ARROW_CHECK_OK(client_.Contains(id2, &has_object)); + ASSERT_TRUE(has_object); +} + +TEST_F(TestPlasmaStore, SetQuotaDemoteDisconnectToGlobalLRU) { + bool has_object = false; + PlasmaClient local_client; + ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); + ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); + + ObjectID id1 = random_object_id(); + std::vector big_data(3 * 1024 * 1024, 0); + + // First object fits + CreateObject(local_client, id1, {42}, big_data, true); + for (int i = 0; i < 10; i++) { + CreateObject(client_, random_object_id(), {42}, big_data, true); + } + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); + + // Object is still present after disconnect + ARROW_CHECK_OK(local_client.Disconnect()); + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_TRUE(has_object); + + // But is eligible for global LRU + for (int i = 0; i < 10; i++) { + CreateObject(client_, random_object_id(), {42}, big_data, true); + } + ARROW_CHECK_OK(client_.Contains(id1, &has_object)); + ASSERT_FALSE(has_object); +} + +TEST_F(TestPlasmaStore, SetQuotaCleanupObjectMetadata) { + PlasmaClient local_client; + ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); + ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); + + ObjectID id0 = random_object_id(); + ObjectID id1 = random_object_id(); + ObjectID id2 = random_object_id(); + ObjectID id3 = random_object_id(); + std::vector big_data(3 * 1024 * 1024, 0); + std::vector small_data(1 * 1024 * 1024, 0); + CreateObject(local_client, id0, {42}, small_data, false); + CreateObject(local_client, id1, {42}, big_data, true); + CreateObject(local_client, id2, {42}, big_data, + true); // spills id0 to global, evicts id1 + CreateObject(local_client, id3, {42}, small_data, false); + + ASSERT_TRUE(client_.DebugString().find("num clients with quota: 1") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("quota map size: 2") != std::string::npos); + ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 1") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(local) num objects: 2") != std::string::npos); + + // release id0 + ARROW_CHECK_OK(local_client.Release(id0)); + ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 1") != + std::string::npos); + + // delete everything + ARROW_CHECK_OK(local_client.Delete(id0)); + ARROW_CHECK_OK(local_client.Delete(id2)); + ARROW_CHECK_OK(local_client.Delete(id3)); + ARROW_CHECK_OK(local_client.Release(id3)); + ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos); + ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 0") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(local) num objects: 0") != std::string::npos); + + ARROW_CHECK_OK(local_client.Disconnect()); + int tries = 10; // wait for disconnect to complete + while (tries > 0 && + client_.DebugString().find("num clients with quota: 0") == std::string::npos) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + tries -= 1; + } + ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) used: 0%") != std::string::npos); +} + +TEST_F(TestPlasmaStore, SetQuotaCleanupClientDisconnect) { + PlasmaClient local_client; + ARROW_CHECK_OK(local_client.Connect(store_socket_name_, "")); + ARROW_CHECK_OK(local_client.SetClientOptions("local", 5 * 1024 * 1024)); + + ObjectID id1 = random_object_id(); + ObjectID id2 = random_object_id(); + ObjectID id3 = random_object_id(); + std::vector big_data(3 * 1024 * 1024, 0); + std::vector small_data(1 * 1024 * 1024, 0); + CreateObject(local_client, id1, {42}, big_data, true); + CreateObject(local_client, id2, {42}, big_data, true); + CreateObject(local_client, id3, {42}, small_data, false); + + ARROW_CHECK_OK(local_client.Disconnect()); + int tries = 10; // wait for disconnect to complete + while (tries > 0 && + client_.DebugString().find("num clients with quota: 0") == std::string::npos) { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + tries -= 1; + } + ASSERT_TRUE(client_.DebugString().find("num clients with quota: 0") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("quota map size: 0") != std::string::npos); + ASSERT_TRUE(client_.DebugString().find("pinned quota map size: 0") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) num objects: 2") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) capacity: 10000000") != + std::string::npos); + ASSERT_TRUE(client_.DebugString().find("(global lru) used: 41.9431%") != + std::string::npos); +} + TEST_F(TestPlasmaStore, DeleteTest) { ObjectID object_id = random_object_id(); diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx index 7e994c3ee079..f09cd54be5d2 100644 --- a/python/pyarrow/_plasma.pyx +++ b/python/pyarrow/_plasma.pyx @@ -138,6 +138,11 @@ cdef extern from "plasma/client.h" nogil: CStatus Delete(const c_vector[CUniqueID] object_ids) + CStatus SetClientOptions(const c_string& client_name, + int64_t limit_output_memory) + + c_string DebugString() + int64_t store_capacity() cdef extern from "plasma/client.h" nogil: @@ -741,6 +746,19 @@ cdef class PlasmaClient: with nogil: plasma_check_status(self.client.get().Delete(ids)) + def set_client_options(self, client_name, int64_t limit_output_memory): + cdef c_string name + name = client_name.encode() + with nogil: + plasma_check_status( + self.client.get().SetClientOptions(name, limit_output_memory)) + + def debug_string(self): + cdef c_string result + with nogil: + result = self.client.get().DebugString() + return result.decode() + def list(self): """ Experimental: List the objects in the store.