Skip to content

Commit

Permalink
queue outgoing messages even w/o active connection
Browse files Browse the repository at this point in the history
this patch changes the behavior of the default protocol so that outgoing
messages always use a queue, even if there is no active connection to
the receiver; queued messages are sent eventually if an active
connection to the receiving node is available
  • Loading branch information
Neverlord committed Nov 3, 2012
1 parent 8c71fa2 commit 071dbe1
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 43 deletions.
1 change: 1 addition & 0 deletions cppa.files
Expand Up @@ -296,3 +296,4 @@ cppa/wildcard_position.hpp
cppa/network/message_header.hpp
src/message_header.cpp
cppa/qtsupport/actor_widget_mixin.hpp
cppa/network/default_message_queue.hpp
5 changes: 5 additions & 0 deletions cppa/intrusive_ptr.hpp
Expand Up @@ -112,6 +112,11 @@ class intrusive_ptr : util::comparable<intrusive_ptr<T> >,
set_ptr(new_value);
}

template<typename... Args>
void emplace(Args&&... args) {
reset(new T(std::forward<Args>(args)...));
}

intrusive_ptr& operator=(pointer ptr) {
reset(ptr);
return *this;
Expand Down
73 changes: 73 additions & 0 deletions cppa/network/default_message_queue.hpp
@@ -0,0 +1,73 @@
/******************************************************************************\
* ___ __ *
* /\_ \ __/\ \ *
* \//\ \ /\_\ \ \____ ___ _____ _____ __ *
* \ \ \ \/\ \ \ '__`\ /'___\/\ '__`\/\ '__`\ /'__`\ *
* \_\ \_\ \ \ \ \L\ \/\ \__/\ \ \L\ \ \ \L\ \/\ \L\.\_ *
* /\____\\ \_\ \_,__/\ \____\\ \ ,__/\ \ ,__/\ \__/.\_\ *
* \/____/ \/_/\/___/ \/____/ \ \ \/ \ \ \/ \/__/\/_/ *
* \ \_\ \ \_\ *
* \/_/ \/_/ *
* *
* Copyright (C) 2011, 2012 *
* Dominik Charousset <dominik.charousset@haw-hamburg.de> *
* *
* This file is part of libcppa. *
* libcppa is free software: you can redistribute it and/or modify it under *
* the terms of the GNU Lesser General Public License as published by the *
* Free Software Foundation, either version 3 of the License *
* or (at your option) any later version. *
* *
* libcppa is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *
* See the GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with libcppa. If not, see <http://www.gnu.org/licenses/>. *
\******************************************************************************/


#ifndef CPPA_MESSAGE_QUEUE_HPP
#define CPPA_MESSAGE_QUEUE_HPP

#include "cppa/any_tuple.hpp"
#include "cppa/ref_counted.hpp"
#include "cppa/network/message_header.hpp"

namespace cppa { namespace network {

class default_message_queue : public ref_counted {

public:

typedef std::pair<message_header,any_tuple> value_type;

typedef value_type& reference;

template<typename... Args>
void emplace(Args&&... args) {
m_impl.emplace_back(std::forward<Args>(args)...);
}

inline bool empty() const { return m_impl.empty(); }

inline value_type pop() {
value_type result(std::move(m_impl.front()));
m_impl.erase(m_impl.begin());
return std::move(result);
}

private:

std::vector<value_type> m_impl;

};

typedef intrusive_ptr<default_message_queue> default_message_queue_ptr;

} } // namespace cppa::network



#endif // CPPA_MESSAGE_QUEUE_HPP
19 changes: 19 additions & 0 deletions cppa/network/default_peer.hpp
Expand Up @@ -45,6 +45,7 @@
#include "cppa/network/output_stream.hpp"
#include "cppa/network/continuable_reader.hpp"
#include "cppa/network/continuable_writer.hpp"
#include "cppa/network/default_message_queue.hpp"

namespace cppa { namespace network {

Expand All @@ -55,6 +56,8 @@ class default_peer : public continuable_reader, public continuable_writer {
typedef continuable_reader lsuper;
typedef continuable_writer rsuper;

friend class default_protocol;

public:

default_peer(default_protocol* parent,
Expand All @@ -78,6 +81,10 @@ class default_peer : public continuable_reader, public continuable_writer {
return *m_node;
}

inline bool has_unwritten_data() const {
return m_has_unwritten_data;
}

protected:

~default_peer();
Expand Down Expand Up @@ -108,6 +115,16 @@ class default_peer : public continuable_reader, public continuable_writer {
util::buffer m_rd_buf;
util::buffer m_wr_buf;

default_message_queue_ptr m_queue;

inline default_message_queue& queue() {
return *m_queue;
}

inline void set_queue(const default_message_queue_ptr& queue) {
m_queue = queue;
}

// if this peer was created using remote_actor(), then m_doorman will
// point to the published actor of the remote node
bool m_erase_on_last_proxy_exited;
Expand All @@ -128,12 +145,14 @@ class default_peer : public continuable_reader, public continuable_writer {
enqueue({nullptr, nullptr}, msg);
}

/*
template<typename Arg0, typename Arg1, typename... Args>
inline void enqueue(Arg0&& arg0, Arg1&& arg1, Args&&... args) {
enqueue(make_any_tuple(std::forward<Arg0>(arg0),
std::forward<Arg1>(arg1),
std::forward<Args>(args)...));
}
*/

};

Expand Down
17 changes: 13 additions & 4 deletions cppa/network/default_protocol.hpp
Expand Up @@ -39,8 +39,9 @@

#include "cppa/network/protocol.hpp"
#include "cppa/network/default_peer.hpp"
#include "cppa/network/default_actor_addressing.hpp"
#include "cppa/network/default_peer_acceptor.hpp"
#include "cppa/network/default_message_queue.hpp"
#include "cppa/network/default_actor_addressing.hpp"

namespace cppa { namespace network {

Expand Down Expand Up @@ -70,11 +71,15 @@ class default_protocol : public protocol {

default_peer_ptr get_peer(const process_information& node);

void enqueue(const process_information& node,
const message_header& hdr,
any_tuple msg);

void new_peer(const input_stream_ptr& in,
const output_stream_ptr& out,
const process_information_ptr& node = nullptr);

void erase_peer(const default_peer_ptr& pptr);
void last_proxy_exited(const default_peer_ptr& pptr);

void continue_writer(const default_peer_ptr& pptr);

Expand All @@ -83,10 +88,14 @@ class default_protocol : public protocol {

private:

default_actor_addressing m_addressing;
struct peer_entry {
default_peer_ptr impl;
default_message_queue_ptr queue;
};

default_actor_addressing m_addressing;
std::map<actor_ptr,std::vector<default_peer_acceptor_ptr> > m_acceptors;
std::map<process_information,default_peer_ptr> m_peers;
std::map<process_information,peer_entry> m_peers;

};

Expand Down
10 changes: 5 additions & 5 deletions src/default_actor_addressing.cpp
Expand Up @@ -138,11 +138,11 @@ void default_actor_addressing::put(const process_information& node,
auto i = submap.find(aid);
if (i == submap.end()) {
submap.insert(make_pair(aid, proxy));
auto p = m_parent->get_peer(node);
CPPA_LOG_WARNING_IF(!p, "put a proxy for an unknown peer");
if (p) {
p->enqueue({nullptr, nullptr}, make_any_tuple(atom("MONITOR"), process_information::get(), aid));
}
m_parent->enqueue(node,
{nullptr, nullptr},
make_any_tuple(atom("MONITOR"),
process_information::get(),
aid));
}
else {
CPPA_LOG_ERROR("a proxy for " << aid << ":" << to_string(node)
Expand Down
15 changes: 4 additions & 11 deletions src/default_actor_proxy.cpp
Expand Up @@ -58,27 +58,20 @@ default_actor_proxy::~default_actor_proxy() {
auto p = proto->get_peer(*node);
if (p && p->erase_on_last_proxy_exited()) {
if (proto->addressing()->count_proxies(*node) == 0) {
proto->erase_peer(p);
proto->last_proxy_exited(p);
}
}
});
}

void default_actor_proxy::forward_msg(const actor_ptr& sender, any_tuple msg, message_id_t mid) {
CPPA_LOG_TRACE("");
message_header hdr{sender, this, mid};
auto node = m_pinf;
actor_ptr receiver = this;
auto proto = m_proto;
m_proto->run_later([proto, node, sender, receiver, msg, mid] {
m_proto->run_later([hdr, msg, node, proto] {
CPPA_LOGF_TRACE("lambda from default_actor_proxy::forward_msg");
/*
<< "node = " << to_string(*node)
<< ", sender =" << to_string(sender)
<< ", receiver = " << to_string(receiver)
<< ", proto = " << to_string(proto->identifier()));
*/
auto p = proto->get_peer(*node);
if (p) p->enqueue({sender, receiver, mid}, msg);
proto->enqueue(*node, hdr, msg);
});
}

Expand Down
23 changes: 16 additions & 7 deletions src/default_peer.cpp
Expand Up @@ -148,7 +148,7 @@ continue_reading_result default_peer::continue_reading() {
<< ", what(): " << e.what());
return read_failure;
}
CPPA_LOG_DEBUG("deserialized: " << to_string(msg));
CPPA_LOG_DEBUG("deserialized: " << to_string(hdr) << " " << to_string(msg));
//DEBUG("<-- " << to_string(msg));
match(msg) (
// monitor messages are sent automatically whenever
Expand Down Expand Up @@ -208,7 +208,7 @@ void default_peer::monitor(const actor_ptr&,
// this actor already finished execution;
// reply with KILL_PROXY message
// get corresponding peer
enqueue(atom("KILL_PROXY"), pself, aid, entry.second);
enqueue(make_any_tuple(atom("KILL_PROXY"), pself, aid, entry.second));
}
}
else {
Expand All @@ -218,7 +218,7 @@ void default_peer::monitor(const actor_ptr&,
proto->run_later([=] {
CPPA_LOGF_TRACE("lambda from default_peer::monitor");
auto p = proto->get_peer(*node);
if (p) p->enqueue(atom("KILL_PROXY"), pself, aid, reason);
if (p) p->enqueue(make_any_tuple(atom("KILL_PROXY"), pself, aid, reason));
});
});
}
Expand Down Expand Up @@ -306,7 +306,8 @@ void default_peer::unlink(const actor_ptr& sender, const actor_ptr& ptr) {

continue_writing_result default_peer::continue_writing() {
CPPA_LOG_TRACE("");
if (m_has_unwritten_data) {
CPPA_LOG_DEBUG_IF(!m_has_unwritten_data, "nothing to write (done)");
while (m_has_unwritten_data) {
size_t written;
try { written = m_out->write_some(m_wr_buf.data(), m_wr_buf.size()); }
catch (exception& e) {
Expand All @@ -325,10 +326,18 @@ continue_writing_result default_peer::continue_writing() {
m_wr_buf.reset();
m_has_unwritten_data = false;
CPPA_LOG_DEBUG("write done, " << written << "bytes written");
return write_done;
}
// try to write next message in queue
while (!m_has_unwritten_data && !queue().empty()) {
auto tmp = queue().pop();
enqueue(tmp.first, tmp.second);
}
}
if (erase_on_last_proxy_exited() && !has_unwritten_data()) {
if (m_parent->addressing()->count_proxies(*m_node) == 0) {
m_parent->last_proxy_exited(this);
}
}
CPPA_LOG_DEBUG("nothing to write (done)");
return write_done;
}

Expand All @@ -350,7 +359,7 @@ void default_peer::enqueue(const message_header& hdr, const any_tuple& msg) {
<< endl;
return;
}
CPPA_LOG_DEBUG("serialized: " << to_string(msg));
CPPA_LOG_DEBUG("serialized: " << to_string(hdr) << " " << to_string(msg));
size = (m_wr_buf.size() - before) - sizeof(std::uint32_t);
// update size in buffer
memcpy(m_wr_buf.data() + before, &size, sizeof(std::uint32_t));
Expand Down
57 changes: 41 additions & 16 deletions src/default_protocol.cpp
Expand Up @@ -114,25 +114,47 @@ void default_protocol::unpublish(const actor_ptr& whom) {
void default_protocol::register_peer(const process_information& node,
default_peer* ptr) {
CPPA_LOG_TRACE("node = " << to_string(node) << ", ptr = " << ptr);
auto& ptrref = m_peers[node];
if (ptrref) {
CPPA_LOG_INFO("peer " << to_string(node) << " already defined");
auto& entry = m_peers[node];
if (entry.impl == nullptr) {
if (entry.queue == nullptr) entry.queue.emplace();
ptr->set_queue(entry.queue);
entry.impl.reset(ptr);
if (!entry.queue->empty()) {
auto tmp = entry.queue->pop();
ptr->enqueue(tmp.first, tmp.second);
}
}
else ptrref = ptr;
else { CPPA_LOG_ERROR("peer " << to_string(node) << " already defined"); }
}

default_peer_ptr default_protocol::get_peer(const process_information& n) {
CPPA_LOG_TRACE("n = " << to_string(n));
auto e = end(m_peers);
auto i = m_peers.find(n);
if (i != e) {
CPPA_LOG_DEBUG("result = " << i->second.get());
return i->second;
if (i != m_peers.end()) {
CPPA_LOG_DEBUG("result = " << i->second.impl.get());
return i->second.impl;
}
CPPA_LOG_DEBUG("result = nullptr");
return nullptr;
}

void default_protocol::enqueue(const process_information& node,
const message_header& hdr,
any_tuple msg) {
auto& entry = m_peers[node];
if (entry.impl) {
CPPA_REQUIRE(entry.queue != nullptr);
if (!entry.impl->has_unwritten_data()) {
CPPA_REQUIRE(entry.queue->empty());
entry.impl->enqueue(hdr, msg);
return;
}
}
if (entry.queue == nullptr) entry.queue.emplace();
entry.queue->emplace(hdr, msg);
}


actor_ptr default_protocol::remote_actor(variant_args args) {
CPPA_LOG_TRACE("args.size() = " << args.size());
CPPA_REQUIRE(args.size() == 2);
Expand Down Expand Up @@ -187,17 +209,20 @@ actor_ptr default_protocol::remote_actor(io_stream_ptr_pair io,
return result->value;
}

void default_protocol::erase_peer(const default_peer_ptr& pptr) {
void default_protocol::last_proxy_exited(const default_peer_ptr& pptr) {
CPPA_REQUIRE(pptr != nullptr);
CPPA_LOG_TRACE("pptr = " << pptr.get()
<< ", pptr->node() = " << to_string(pptr->node()));
stop_reader(pptr.get());
auto i = m_peers.find(pptr->node());
if (i != m_peers.end()) {
CPPA_LOG_DEBUG_IF(i->second != pptr, "node " << to_string(pptr->node())
<< " does not exist in m_peers");
if (i->second == pptr) {
m_peers.erase(i);
if (pptr->erase_on_last_proxy_exited() && pptr->queue().empty()) {
stop_reader(pptr.get());
auto i = m_peers.find(pptr->node());
if (i != m_peers.end()) {
CPPA_LOG_DEBUG_IF(i->second.impl != pptr,
"node " << to_string(pptr->node())
<< " does not exist in m_peers");
if (i->second.impl == pptr) {
m_peers.erase(i);
}
}
}
}
Expand Down

0 comments on commit 071dbe1

Please sign in to comment.