Skip to content

Commit

Permalink
use per-thread recursive_queue_node caching
Browse files Browse the repository at this point in the history
this patch replaces the per-actor caching of `recursive_queue_node`
instances with a per-thread caching strategy that also allocates
nodes more efficiently
  • Loading branch information
Neverlord committed Nov 5, 2012
1 parent 41ebf87 commit 745e012
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 51 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Expand Up @@ -116,6 +116,7 @@ set(LIBCPPA_SRC
src/local_actor.cpp
src/logging.cpp
src/match.cpp
src/memory.cpp
src/message_header.cpp
src/middleman.cpp
src/object.cpp
Expand Down
2 changes: 2 additions & 0 deletions cppa.files
Expand Up @@ -275,3 +275,5 @@ unit_testing/test__tuple.cpp
unit_testing/test__type_list.cpp
unit_testing/test__uniform_type.cpp
unit_testing/test__yield_interface.cpp
cppa/detail/memory.hpp
src/memory.cpp
3 changes: 2 additions & 1 deletion cppa/actor_companion_mixin.hpp
Expand Up @@ -42,6 +42,7 @@
#include "cppa/util/shared_spinlock.hpp"
#include "cppa/util/shared_lock_guard.hpp"

#include "cppa/detail/memory.hpp"
#include "cppa/detail/abstract_actor.hpp"

namespace cppa {
Expand All @@ -53,7 +54,7 @@ class actor_companion_mixin : public Base {

public:

typedef std::unique_ptr<detail::recursive_queue_node> message_pointer;
typedef std::unique_ptr<detail::recursive_queue_node,detail::disposer> message_pointer;

template<typename... Args>
actor_companion_mixin(Args&&... args) : super(std::forward<Args>(args)...) {
Expand Down
40 changes: 6 additions & 34 deletions cppa/detail/abstract_actor.hpp
Expand Up @@ -47,6 +47,7 @@
#include "cppa/local_actor.hpp"
#include "cppa/attachable.hpp"
#include "cppa/exit_reason.hpp"
#include "cppa/detail/memory.hpp"
#include "cppa/util/shared_spinlock.hpp"

#include "cppa/detail/recursive_queue_node.hpp"
Expand Down Expand Up @@ -182,48 +183,19 @@ class abstract_actor : public abstract_actor_base<Base, std::is_base_of<local_ac
protected:

mailbox_type m_mailbox;
util::fixed_vector<mailbox_element*, 10> m_nodes;
util::shared_spinlock m_nodes_lock;

typedef std::lock_guard<util::shared_spinlock> lock_type;

inline mailbox_element* fetch_node(actor* sender,
any_tuple msg,
message_id_t id = message_id_t()) {
mailbox_element* result = nullptr;
{ // lifetime scope of guard
lock_type guard{m_nodes_lock};
if (!m_nodes.empty()) {
result = m_nodes.back();
m_nodes.pop_back();
}
}
if (result) result->reset(sender, std::move(msg), id);
else result = new mailbox_element(sender, std::move(msg), id);
auto result = memory::new_queue_node();
result->reset(sender, std::move(msg), id);
return result;
}

inline void release_node(mailbox_element* node) {
// prevent
node->msg.reset();
{ // lifetime scope of guard
lock_type guard{m_nodes_lock};
if (m_nodes.full() == false) {
m_nodes.push_back(node);
return;
}
}
delete node;
}

template<typename... Args>
abstract_actor(Args&&... args)
: super(std::forward<Args>(args)...),m_exit_reason(exit_reason::not_exited){
// pre-allocate some nodes
for (size_t i = 0; i < m_nodes.max_size() / 2; ++i) {
m_nodes.push_back(new mailbox_element);
}
}
: super(std::forward<Args>(args)...)
, m_exit_reason(exit_reason::not_exited){ }

void cleanup(std::uint32_t reason) {
if (reason == exit_reason::not_exited) return;
Expand Down Expand Up @@ -291,7 +263,7 @@ class abstract_actor : public abstract_actor_base<Base, std::is_base_of<local_ac

// true if the associated thread has finished execution
std::atomic<std::uint32_t> m_exit_reason;
// guards access to m_exited, m_subscriptions and m_links
// guards access to m_exited, m_subscriptions, and m_links
std::mutex m_mtx;
// links to other actors
std::vector<actor_ptr> m_links;
Expand Down
66 changes: 66 additions & 0 deletions cppa/detail/memory.hpp
@@ -0,0 +1,66 @@
/******************************************************************************\
* ___ __ *
* /\_ \ __/\ \ *
* \//\ \ /\_\ \ \____ ___ _____ _____ __ *
* \ \ \ \/\ \ \ '__`\ /'___\/\ '__`\/\ '__`\ /'__`\ *
* \_\ \_\ \ \ \ \L\ \/\ \__/\ \ \L\ \ \ \L\ \/\ \L\.\_ *
* /\____\\ \_\ \_,__/\ \____\\ \ ,__/\ \ ,__/\ \__/.\_\ *
* \/____/ \/_/\/___/ \/____/ \ \ \/ \ \ \/ \/__/\/_/ *
* \ \_\ \ \_\ *
* \/_/ \/_/ *
* *
* Copyright (C) 2011, 2012 *
* Dominik Charousset <dominik.charousset@haw-hamburg.de> *
* *
* This file is part of libcppa. *
* libcppa is free software: you can redistribute it and/or modify it under *
* the terms of the GNU Lesser General Public License as published by the *
* Free Software Foundation, either version 3 of the License *
* or (at your option) any later version. *
* *
* libcppa is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
* See the GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with libcppa. If not, see <http://www.gnu.org/licenses/>. *
\******************************************************************************/


#ifndef CPPA_MEMORY_HPP
#define CPPA_MEMORY_HPP

namespace cppa { namespace detail {

class memory_cache;
class recursive_queue_node;

class memory {

memory() = delete;

friend class memory_cache;

public:

static recursive_queue_node* new_queue_node();

static void dispose(recursive_queue_node* ptr);

private:

static void destroy(recursive_queue_node* ptr);

};

struct disposer {
template<typename T>
void operator()(T* ptr) {
memory::dispose(ptr);
}
};

} } // namespace cppa::detail

#endif // CPPA_MEMORY_HPP
10 changes: 5 additions & 5 deletions cppa/detail/receive_policy.hpp
Expand Up @@ -40,6 +40,8 @@
#include "cppa/message_id.hpp"
#include "cppa/exit_reason.hpp"
#include "cppa/partial_function.hpp"

#include "cppa/detail/memory.hpp"
#include "cppa/detail/recursive_queue_node.hpp"

namespace cppa { namespace detail {
Expand Down Expand Up @@ -79,12 +81,10 @@ class receive_policy {
switch (this->handle_message(client, i->get(), fun,
awaited_response, policy)) {
case hm_msg_handled: {
client->release_node(i->release());
m_cache.erase(i);
return true;
}
case hm_drop_msg: {
client->release_node(i->release());
i = m_cache.erase(i);
break;
}
Expand All @@ -110,11 +110,11 @@ class receive_policy {
switch (this->handle_message(client, node, fun,
awaited_response, policy)) {
case hm_msg_handled: {
client->release_node(node);
memory::dispose(node);
return true;
}
case hm_drop_msg: {
client->release_node(node);
memory::dispose(node);
break;
}
case hm_cache_msg: {
Expand Down Expand Up @@ -195,7 +195,7 @@ class receive_policy {
typedef typename rp_flag<rp_nestable>::type nestable;
typedef typename rp_flag<rp_sequential>::type sequential;

std::list<std::unique_ptr<recursive_queue_node> > m_cache;
std::list<std::unique_ptr<recursive_queue_node,disposer> > m_cache;

template<class Client>
inline void handle_timeout(Client* client, behavior& bhvr) {
Expand Down
26 changes: 20 additions & 6 deletions cppa/detail/recursive_queue_node.hpp
Expand Up @@ -38,9 +38,21 @@

#include "cppa/message_id.hpp"

// needs access to constructor + destructor to initialize m_dummy_node
namespace cppa { class local_actor; }

namespace cppa { namespace detail {

struct recursive_queue_node {
class memory;
class recursive_queue_node_storage;

class recursive_queue_node {

friend class memory;
friend class local_actor;
friend class recursive_queue_node_storage;

public:

typedef recursive_queue_node* pointer;

Expand All @@ -63,16 +75,18 @@ struct recursive_queue_node {
msg = std::move(data);
}

inline recursive_queue_node() { reset(nullptr, message_id_t(), false); }

inline recursive_queue_node(actor* ptr, any_tuple&& data, message_id_t id = message_id_t())
: msg(std::move(data)) { reset(ptr, id, false); }

recursive_queue_node(recursive_queue_node&&) = delete;
recursive_queue_node(const recursive_queue_node&) = delete;
recursive_queue_node& operator=(recursive_queue_node&&) = delete;
recursive_queue_node& operator=(const recursive_queue_node&) = delete;

private:

inline recursive_queue_node() { reset(nullptr, message_id_t(), false); }
inline ~recursive_queue_node() { }

recursive_queue_node_storage* parent;

};

} } // namespace cppa::detail
Expand Down
131 changes: 131 additions & 0 deletions src/memory.cpp
@@ -0,0 +1,131 @@
/******************************************************************************\
* ___ __ *
* /\_ \ __/\ \ *
* \//\ \ /\_\ \ \____ ___ _____ _____ __ *
* \ \ \ \/\ \ \ '__`\ /'___\/\ '__`\/\ '__`\ /'__`\ *
* \_\ \_\ \ \ \ \L\ \/\ \__/\ \ \L\ \ \ \L\ \/\ \L\.\_ *
* /\____\\ \_\ \_,__/\ \____\\ \ ,__/\ \ ,__/\ \__/.\_\ *
* \/____/ \/_/\/___/ \/____/ \ \ \/ \ \ \/ \/__/\/_/ *
* \ \_\ \ \_\ *
* \/_/ \/_/ *
* *
* Copyright (C) 2011, 2012 *
* Dominik Charousset <dominik.charousset@haw-hamburg.de> *
* *
* This file is part of libcppa. *
* libcppa is free software: you can redistribute it and/or modify it under *
* the terms of the GNU Lesser General Public License as published by the *
* Free Software Foundation, either version 3 of the License *
* or (at your option) any later version. *
* *
* libcppa is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
* See the GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with libcppa. If not, see <http://www.gnu.org/licenses/>. *
\******************************************************************************/


#include <vector>

#include "cppa/detail/memory.hpp"
#include "cppa/detail/recursive_queue_node.hpp"

using namespace std;

namespace cppa { namespace detail {

namespace {

pthread_key_t s_key;
pthread_once_t s_key_once = PTHREAD_ONCE_INIT;

constexpr size_t s_queue_node_storage_size = 20;
constexpr size_t s_max_cached_queue_nodes = 100;

} // namespace <anonymous>

class recursive_queue_node_storage : public ref_counted {

public:

recursive_queue_node_storage() {
for (auto& instance : m_instances) {
// each instance has a reference to its parent
instance.parent = this;
ref(); // deref() is called in memory::destroy
}
}

typedef recursive_queue_node* iterator;

iterator begin() { return std::begin(m_instances); }

iterator end() { return std::end(m_instances); }

private:

recursive_queue_node m_instances[s_queue_node_storage_size];

};

class memory_cache {

public:

vector<recursive_queue_node*> qnodes;

memory_cache() {
qnodes.reserve(s_max_cached_queue_nodes);
}

~memory_cache() {
for (auto node : qnodes) memory::destroy(node);
}

static void destructor(void* ptr) {
if (ptr) delete reinterpret_cast<memory_cache*>(ptr);
}

static void make() {
pthread_key_create(&s_key, destructor);
}

static memory_cache* get() {
pthread_once(&s_key_once, make);
auto result = static_cast<memory_cache*>(pthread_getspecific(s_key));
if (!result) {
result = new memory_cache;
pthread_setspecific(s_key, result);
}
return result;
}

};

recursive_queue_node* memory::new_queue_node() {
auto& vec = memory_cache::get()->qnodes;
if (!vec.empty()) {
recursive_queue_node* result = vec.back();
vec.pop_back();
return result;
}
auto storage = new recursive_queue_node_storage;
for (auto i = storage->begin(); i != storage->end(); ++i) vec.push_back(i);
return new_queue_node();
}

void memory::dispose(recursive_queue_node* ptr) {
auto& vec = memory_cache::get()->qnodes;
if (vec.size() < s_max_cached_queue_nodes) vec.push_back(ptr);
else destroy(ptr);
}

void memory::destroy(recursive_queue_node* ptr) {
auto parent = ptr->parent;
parent->deref();
}

} } // namespace cppa::detail

0 comments on commit 745e012

Please sign in to comment.