Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

fixed several issues related to message priorities

this patch fixes several bugs that are related to the recently added
support for priorities; it also refactors the remote actors unit test
and improves logging
  • Loading branch information...
commit 31699f7fac27219f63858d96cd855641a861babd 1 parent 7bf68b2
@Neverlord Neverlord authored
Showing with 477 additions and 380 deletions.
  1. +1 −1  CMakeLists.txt
  2. +0 −2  cppa/actor.hpp
  3. +3 −3 cppa/config.hpp
  4. +0 −1  cppa/context_switching_actor.hpp
  5. +8 −15 cppa/cppa.hpp
  6. +2 −2 cppa/detail/decorated_tuple.hpp
  7. +2 −3 cppa/detail/sync_request_bouncer.hpp
  8. +1 −1  cppa/detail/tdata.hpp
  9. +2 −2 cppa/detail/thread_pool_scheduler.hpp
  10. +9 −20 cppa/local_actor.hpp
  11. +14 −11 cppa/logging.hpp
  12. +4 −4 cppa/scheduler.hpp
  13. +33 −12 cppa/send.hpp
  14. +12 −11 cppa/spawn_options.hpp
  15. +10 −12 src/actor.cpp
  16. +11 −5 src/default_actor_proxy.cpp
  17. +0 −21 src/default_peer.cpp
  18. +1 −1  src/default_protocol.cpp
  19. +6 −6 src/group_manager.cpp
  20. +34 −14 src/local_actor.cpp
  21. +13 −3 src/logging.cpp
  22. +6 −0 src/middleman.cpp
  23. +1 −1  src/opencl/program.cpp
  24. +1 −1  src/scheduler.cpp
  25. +2 −0  src/self.cpp
  26. +9 −10 src/singleton_manager.cpp
  27. +52 −25 src/thread_pool_scheduler.cpp
  28. +0 −2  src/uniform_type_info.cpp
  29. +14 −17 unit_testing/ping_pong.cpp
  30. +2 −2 unit_testing/ping_pong.hpp
  31. +2 −2 unit_testing/test.hpp
  32. +221 −169 unit_testing/test_remote_actor.cpp
  33. +1 −0  unit_testing/test_spawn.cpp
  34. +0 −1  unit_testing/test_sync_send.cpp
View
2  CMakeLists.txt
@@ -72,7 +72,7 @@ endif ()
# set build type (evaluate ENABLE_DEBUG flag)
if (ENABLE_DEBUG)
set(CMAKE_BUILD_TYPE Debug)
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DCPPA_ENABLE_DEBUG")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -DCPPA_DEBUG_MODE")
endif (ENABLE_DEBUG)
if (CPPA_LOG_LEVEL)
View
2  cppa/actor.hpp
@@ -74,8 +74,6 @@ class actor : public channel {
public:
- ~actor();
-
/**
* @brief Attaches @p ptr to this actor.
*
View
6 cppa/config.hpp
@@ -55,7 +55,7 @@
#include <cstdio>
#include <cstdlib>
-#ifdef CPPA_ENABLE_DEBUG
+#ifdef CPPA_DEBUG_MODE
#include <execinfo.h>
#define CPPA_REQUIRE__(stmt, file, line) \
@@ -70,9 +70,9 @@
if ((stmt) == false) { \
CPPA_REQUIRE__(#stmt, __FILE__, __LINE__); \
}((void) 0)
-#else // CPPA_ENABLE_DEBUG
+#else // CPPA_DEBUG_MODE
#define CPPA_REQUIRE(unused) ((void) 0)
-#endif // CPPA_ENABLE_DEBUG
+#endif // CPPA_DEBUG_MODE
#define CPPA_CRITICAL__(error, file, line) { \
printf("%s:%u: critical error: '%s'\n", file, line, error); \
View
1  cppa/context_switching_actor.hpp
@@ -38,7 +38,6 @@
#include "cppa/stacked.hpp"
#include "cppa/scheduled_actor.hpp"
-
#include "cppa/detail/receive_policy.hpp"
#include "cppa/detail/behavior_stack.hpp"
#include "cppa/detail/yield_interface.hpp"
View
23 cppa/cppa.hpp
@@ -34,6 +34,7 @@
#include <tuple>
#include <chrono>
#include <cstdint>
+#include <cstring>
#include <functional>
#include <type_traits>
@@ -450,7 +451,7 @@ operator<<(const intrusive_ptr<C>& whom, any_tuple what) {
}
inline const self_type& operator<<(const self_type& s, any_tuple what) {
- send_tuple(s, std::move(what));
+ send_tuple(s.get(), std::move(what));
return s;
}
@@ -458,17 +459,9 @@ inline const self_type& operator<<(const self_type& s, any_tuple what) {
* @}
*/
-/*
-// matches "send(this, ...)" and "send(self, ...)"
-inline void send_tuple(channel* whom, any_tuple what) {
- detail::send_impl(whom, std::move(what));
-}
-template<typename... Ts>
-inline void send(channel* whom, Ts&&... args) {
- detail::send_tpl_impl(whom, std::forward<Ts>(args)...);
-}
-*/
-inline actor_ptr eval_sopts(spawn_options opts, actor_ptr ptr) {
+inline actor_ptr eval_sopts(spawn_options opts, local_actor_ptr ptr) {
+ CPPA_LOGF_INFO("spawned new local actor with ID " << ptr->id()
+ << " of type " << detail::demangle(typeid(*ptr)));
if (has_monitor_flag(opts)) self->monitor(ptr);
if (has_link_flag(opts)) self->link_to(ptr);
return std::move(ptr);
@@ -548,9 +541,9 @@ actor_ptr spawn_in_group(const group_ptr& grp, Ts&&... args) {
*/
template<class Impl, spawn_options Options = no_spawn_options, typename... Ts>
actor_ptr spawn_in_group(const group_ptr& grp, Ts&&... args) {
- auto rawptr = detail::memory::create<Impl>(std::forward<Ts>(args)...);
- rawptr->join(grp);
- return eval_sopts(Options, get_scheduler()->exec(Options, rawptr));
+ auto ptr = make_counted<Impl>(std::forward<Ts>(args)...);
+ ptr->join(grp);
+ return eval_sopts(Options, get_scheduler()->exec(Options, ptr));
}
/** @} */
View
4 cppa/detail/decorated_tuple.hpp
@@ -107,7 +107,7 @@ class decorated_tuple : public abstract_tuple {
decorated_tuple(cow_pointer_type d, const vector_type& v)
: super(false)
, m_decorated(std::move(d)), m_mapping(v) {
-# ifdef CPPA_ENABLE_DEBUG
+# ifdef CPPA_DEBUG_MODE
const cow_pointer_type& ptr = m_decorated; // prevent detaching
# endif
CPPA_REQUIRE(ptr->size() >= sizeof...(Ts));
@@ -117,7 +117,7 @@ class decorated_tuple : public abstract_tuple {
decorated_tuple(cow_pointer_type d, size_t offset)
: super(false), m_decorated(std::move(d)) {
-# ifdef CPPA_ENABLE_DEBUG
+# ifdef CPPA_DEBUG_MODE
const cow_pointer_type& ptr = m_decorated; // prevent detaching
# endif
CPPA_REQUIRE((ptr->size() - offset) >= sizeof...(Ts));
View
5 cppa/detail/sync_request_bouncer.hpp
@@ -48,13 +48,12 @@ struct sync_request_bouncer {
inline void operator()(const actor_ptr& sender, const message_id& mid) const {
CPPA_REQUIRE(rsn != exit_reason::not_exited);
if (mid.is_request() && sender != nullptr) {
- actor_ptr nobody;
- sender->enqueue({nobody, sender, mid.response_id()},
+ sender->enqueue({nullptr, sender, mid.response_id()},
make_any_tuple(atom("EXITED"), rsn));
}
}
inline void operator()(const mailbox_element& e) const {
- (*this)(e.sender.get(), e.mid);
+ (*this)(e.sender, e.mid);
}
};
View
2  cppa/detail/tdata.hpp
@@ -329,7 +329,7 @@ struct tdata<Head, Tail...> : tdata<Tail...> {
}
inline void* mutable_at(size_t p) {
-# ifdef CPPA_ENABLE_DEBUG
+# ifdef CPPA_DEBUG_MODE
if (p == 0) {
if (std::is_same<decltype(ptr_to(head)), const void*>::value) {
throw std::logic_error{"mutable_at with const head"};
View
4 cppa/detail/thread_pool_scheduler.hpp
@@ -61,9 +61,9 @@ class thread_pool_scheduler : public scheduler {
void enqueue(scheduled_actor* what);
- actor_ptr exec(spawn_options opts, scheduled_actor_ptr ptr);
+ virtual local_actor_ptr exec(spawn_options opts, scheduled_actor_ptr ptr) override;
- actor_ptr exec(spawn_options opts, init_callback init_cb, void_function f);
+ virtual local_actor_ptr exec(spawn_options opts, init_callback init_cb, void_function f) override;
private:
View
29 cppa/local_actor.hpp
@@ -96,6 +96,8 @@ class local_actor : public extend<actor>::with<memory_cached> {
public:
+ ~local_actor();
+
/**
* @brief Causes this actor to subscribe to the group @p what.
*
@@ -267,14 +269,6 @@ class local_actor : public extend<actor>::with<memory_cached> {
inline void dequeue_response(behavior&&, message_id);
-/*
- template<bool Discard, typename... Ts>
- void become(behavior_policy<Discard>, Ts&&... args);
-
- template<typename T, typename... Ts>
- void become(T arg, Ts&&... args);
-*/
-
inline void do_unbecome();
local_actor(bool is_scheduled = false);
@@ -312,12 +306,19 @@ class local_actor : public extend<actor>::with<memory_cached> {
inline void do_become(const behavior& bhvr, bool discard_old);
+ const char* debug_name() const;
+
+ void debug_name(std::string str);
+
protected:
inline void remove_handler(message_id id);
void cleanup(std::uint32_t reason);
+ // used *only* when compiled in debug mode
+ union { std::string m_debug_name; };
+
// true if this actor uses the chained_send optimization
bool m_chaining;
@@ -403,18 +404,6 @@ inline actor_ptr& local_actor::last_sender() {
return m_current_node->sender;
}
-/*
-template<bool Discard, typename... Ts>
-inline void local_actor::become(behavior_policy<Discard>, Ts&&... args) {
- do_become(match_expr_convert(std::forward<Ts>(args)...), Discard);
-}
-
-template<typename T, typename... Ts>
-inline void local_actor::become(T arg, Ts&&... args) {
- do_become(match_expr_convert(arg, std::forward<Ts>(args)...), true);
-}
-*/
-
inline void local_actor::do_unbecome() {
m_bhvr_stack.pop_async_back();
}
View
25 cppa/logging.hpp
@@ -114,12 +114,8 @@ inline actor_ptr fwd_aptr(const self_type& s) {
return s.unchecked();
}
-inline actor_ptr fwd_aptr(actor* ptr) {
- return ptr;
-}
-
-inline actor_ptr fwd_aptr(const actor_ptr& ptr) {
- return ptr;
+inline actor_ptr fwd_aptr(actor_ptr ptr) {
+ return std::move(ptr);
}
struct oss_wr {
@@ -165,6 +161,13 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) {
#define CPPA_DEBUG 3
#define CPPA_TRACE 4
+#ifdef CPPA_DEBUG_MODE
+# define CPPA_SET_DEBUG_NAME(strstr) \
+ self->debug_name((::cppa::oss_wr{} << strstr).str());
+#else
+# define CPPA_SET_DEBUG_NAME(unused)
+#endif
+
#define CPPA_LVL_NAME0() "ERROR"
#define CPPA_LVL_NAME1() "WARN "
#define CPPA_LVL_NAME2() "INFO "
@@ -172,9 +175,9 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) {
#define CPPA_LVL_NAME4() "TRACE"
#ifndef CPPA_LOG_LEVEL
-# define CPPA_LOG_IMPL(lvlname, classname, funname, unused, message) {\
- std::cerr << "[" << lvlname << "] " << classname << "::" << funname << ": " \
- << message << "\nStack trace:\n"; \
+# define CPPA_LOG_IMPL(lvlname, classname, funname, unused, message) { \
+ std::cerr << "[" << lvlname << "] " << classname << "::" \
+ << funname << ": " << message << "\nStack trace:\n"; \
void *array[10]; \
size_t size = backtrace(array, 10); \
backtrace_symbols_fd(array, size, 2); \
@@ -254,7 +257,7 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) {
* @def CPPA_LOGMF
* @brief Logs a message inside a member function.
**/
-#define CPPA_LOGMF(level, actorptr, msg) \
+#define CPPA_LOGMF(level, actorptr, msg) \
CPPA_LOGC(level, CPPA_CLASS_NAME, __func__, actorptr, msg)
/**
@@ -287,7 +290,7 @@ oss_wr operator<<(oss_wr&& lhs, T rhs) {
/******************************************************************************
- * backward compatibility for version <= 0.6 *
+ * convenience macros *
******************************************************************************/
#define CPPA_LOG_ERROR(msg) CPPA_LOGMF(CPPA_ERROR, ::cppa::self, msg)
View
8 cppa/scheduler.hpp
@@ -165,19 +165,19 @@ class scheduler {
/**
* @brief Executes @p ptr in this scheduler.
*/
- virtual actor_ptr exec(spawn_options opts, scheduled_actor_ptr ptr) = 0;
+ virtual local_actor_ptr exec(spawn_options opts, scheduled_actor_ptr ptr) = 0;
/**
* @brief Creates a new actor from @p actor_behavior and executes it
* in this scheduler.
*/
- virtual actor_ptr exec(spawn_options opts,
+ virtual local_actor_ptr exec(spawn_options opts,
init_callback init_cb,
void_function actor_behavior) = 0;
template<typename F, typename T, typename... Ts>
- actor_ptr exec(spawn_options opts, init_callback cb,
- F f, T&& a0, Ts&&... as) {
+ local_actor_ptr exec(spawn_options opts, init_callback cb,
+ F f, T&& a0, Ts&&... as) {
return this->exec(opts, cb, std::bind(f, detail::fwd<T>(a0),
detail::fwd<Ts>(as)...));
}
View
45 cppa/send.hpp
@@ -46,19 +46,33 @@ namespace cppa {
* @{
*/
+struct destination_header {
+ channel_ptr receiver;
+ message_priority priority;
+ inline destination_header(const self_type& s)
+ : receiver(s), priority(message_priority::normal) { }
+ template<typename T>
+ inline destination_header(T dest)
+ : receiver(std::move(dest)), priority(message_priority::normal) { }
+ inline destination_header(channel_ptr dest, message_priority prio)
+ : receiver(std::move(dest)), priority(prio) { }
+ inline destination_header(destination_header&& hdr)
+ : receiver(std::move(hdr.receiver)), priority(hdr.priority) { }
+};
+
/**
* @brief Sends @p what to the receiver specified in @p hdr.
*/
-inline void send_tuple(const message_header& hdr, any_tuple what) {
+inline void send_tuple(destination_header hdr, any_tuple what) {
if (hdr.receiver == nullptr) return;
- local_actor* sptr = self.unchecked();
- if (sptr && sptr->chaining_enabled()) {
- if (hdr.receiver->chained_enqueue(hdr, std::move(what))) {
+ message_header fhdr{self, std::move(hdr.receiver), hdr.priority};
+ if (self->chaining_enabled()) {
+ if (fhdr.receiver->chained_enqueue(fhdr, std::move(what))) {
// only actors implement chained_enqueue to return true
- sptr->chained_actor(static_cast<actor*>(hdr.receiver.get()));
+ self->chained_actor(fhdr.receiver.downcast<actor>());
}
}
- else hdr.receiver->enqueue(hdr, std::move(what));
+ else fhdr.deliver(std::move(what));
}
/**
@@ -66,16 +80,17 @@ inline void send_tuple(const message_header& hdr, any_tuple what) {
* @pre <tt>sizeof...(Ts) > 0</tt>
*/
template<typename... Ts>
-inline void send(const message_header& hdr, Ts&&... what) {
+inline void send(destination_header hdr, Ts&&... what) {
static_assert(sizeof...(Ts) > 0, "no message to send");
- send_tuple(hdr, make_any_tuple(std::forward<Ts>(what)...));
+ send_tuple(std::move(hdr), make_any_tuple(std::forward<Ts>(what)...));
}
/**
* @brief Sends @p what to @p whom, but sets the sender information to @p from.
*/
inline void send_tuple_as(actor_ptr from, channel_ptr whom, any_tuple what) {
- send_tuple({std::move(from), std::move(whom)}, std::move(what));
+ message_header hdr{std::move(from), std::move(whom)};
+ hdr.deliver(std::move(what));
}
/**
@@ -88,8 +103,8 @@ inline void send_tuple_as(actor_ptr from, channel_ptr whom, any_tuple what) {
*/
template<typename... Ts>
inline void send_as(actor_ptr from, channel_ptr whom, Ts&&... what) {
- send_tuple({std::move(from), std::move(whom)},
- make_any_tuple(std::forward<Ts>(what)...));
+ send_tuple_as(std::move(from), std::move(whom),
+ make_any_tuple(std::forward<Ts>(what)...));
}
/**
@@ -104,7 +119,13 @@ inline void send_as(actor_ptr from, channel_ptr whom, Ts&&... what) {
inline message_future sync_send_tuple(actor_ptr whom, any_tuple what) {
if (!whom) throw std::invalid_argument("whom == nullptr");
auto req = self->new_request_id();
- send_tuple({std::move(whom), req}, std::move(what));
+ message_header hdr{self, std::move(whom), req};
+ if (self->chaining_enabled()) {
+ if (hdr.receiver->chained_enqueue(hdr, std::move(what))) {
+ self->chained_actor(hdr.receiver.downcast<actor>());
+ }
+ }
+ else hdr.deliver(std::move(what));
return req.response_id();
}
View
23 cppa/spawn_options.hpp
@@ -51,7 +51,7 @@ enum class spawn_options : int {
detach_flag = 0x04,
hide_flag = 0x08,
blocking_api_flag = 0x10,
- priority_aware_flag = 0x24 // priority-aware actors are also detached
+ priority_aware_flag = 0x20
};
#endif
@@ -60,6 +60,15 @@ namespace {
#endif
/**
+ * @brief Concatenates two {@link spawn_options}.
+ * @relates spawn_options
+ */
+constexpr spawn_options operator+(const spawn_options& lhs,
+ const spawn_options& rhs) {
+ return static_cast<spawn_options>(static_cast<int>(lhs) | static_cast<int>(rhs));
+}
+
+/**
* @brief Denotes default settings.
*/
constexpr spawn_options no_spawn_options = spawn_options::no_flags;
@@ -98,22 +107,14 @@ constexpr spawn_options blocking_api = spawn_options::blocking_api_flag;
* @brief Causes the new actor to evaluate message priorities.
* @note This implicitly causes the actor to run in its own thread.
*/
-constexpr spawn_options priority_aware = spawn_options::priority_aware_flag;
+constexpr spawn_options priority_aware = spawn_options::priority_aware_flag
+ + spawn_options::detach_flag;
#ifndef CPPA_DOCUMENTATION
} // namespace <anonymous>
#endif
/**
- * @brief Concatenates two {@link spawn_options}.
- * @relates spawn_options
- */
-constexpr spawn_options operator+(const spawn_options& lhs,
- const spawn_options& rhs) {
- return static_cast<spawn_options>(static_cast<int>(lhs) | static_cast<int>(rhs));
-}
-
-/**
* @brief Checks wheter @p haystack contains @p needle.
* @relates spawn_options
*/
View
22 src/actor.cpp
@@ -61,13 +61,7 @@ actor::actor(actor_id aid)
actor::actor()
: m_id(get_actor_registry()->next_id()), m_is_proxy(false)
-, m_exit_reason(exit_reason::not_exited) {
- CPPA_LOGMF(CPPA_INFO, self, "spawned new local actor with ID " << m_id);
-}
-
-actor::~actor() {
- CPPA_LOG_INFO("ID = " << m_id << "");
-}
+, m_exit_reason(exit_reason::not_exited) { }
bool actor::link_to_impl(const actor_ptr& other) {
if (other && other != this) {
@@ -173,6 +167,8 @@ bool actor::unlink_from_impl(const actor_ptr& other) {
}
void actor::cleanup(std::uint32_t reason) {
+ // log as 'actor'
+ CPPA_LOGM_TRACE("cppa::actor", CPPA_ARG(m_id) << ", " << CPPA_ARG(reason));
CPPA_REQUIRE(reason != exit_reason::not_exited);
// move everyhting out of the critical section before processing it
decltype(m_links) mlinks;
@@ -190,14 +186,16 @@ void actor::cleanup(std::uint32_t reason) {
m_links.clear();
m_attachables.clear();
}
- CPPA_LOG_INFO((is_proxy() ? "proxy" : "local")
- << " actor had " << mlinks.size() << " links and "
- << mattachables.size() << " attached functors; "
- << CPPA_ARG(reason) << ", " << CPPA_ARG(m_id));
+ CPPA_LOGC_INFO_IF(not is_proxy(), "cppa::actor", __func__,
+ "actor with ID " << m_id << " had " << mlinks.size()
+ << " links and " << mattachables.size()
+ << " attached functors; exit reason = " << reason
+ << ", class = " << detail::demangle(typeid(*this)));
// send exit messages
auto msg = make_any_tuple(atom("EXIT"), reason);
for (actor_ptr& aptr : mlinks) {
- send_tuple_as(this, aptr, msg);
+ message_header hdr{this, aptr, message_priority::high};
+ hdr.deliver(msg);
}
for (attachable_ptr& ptr : mattachables) {
ptr->actor_exited(reason);
View
16 src/default_actor_proxy.cpp
@@ -59,11 +59,11 @@ default_actor_proxy::default_actor_proxy(actor_id mid,
}
default_actor_proxy::~default_actor_proxy() {
- auto aid = id();
+ auto aid = m_id;
auto node = m_pinf;
auto proto = m_proto;
- CPPA_LOG_INFO(CPPA_ARG(m_id) << ", " << CPPA_TARG(m_pinf, to_string)
- << "protocol = " << detail::demangle(typeid(*m_proto)));
+ CPPA_LOG_INFO(CPPA_ARG(m_id) << ", " << CPPA_TSARG(m_pinf)
+ << ", protocol = " << detail::demangle(typeid(*m_proto)));
proto->run_later([aid, node, proto] {
CPPA_LOGC_TRACE("cppa::network::default_actor_proxy",
"~default_actor_proxy$run_later",
@@ -94,8 +94,14 @@ void default_actor_proxy::deliver(const message_header& hdr, any_tuple msg) {
}
void default_actor_proxy::forward_msg(const message_header& hdr, any_tuple msg) {
- CPPA_LOG_TRACE("");
- CPPA_REQUIRE(hdr.receiver == this);
+ CPPA_LOG_TRACE(CPPA_ARG(m_id) << ", " << CPPA_TSARG(hdr)
+ << ", " << CPPA_TSARG(msg));
+ if (hdr.receiver != this) {
+ auto cpy = hdr;
+ cpy.receiver = this;
+ forward_msg(cpy, std::move(msg));
+ return;
+ }
if (hdr.sender && hdr.id.is_request()) {
switch (m_pending_requests.enqueue(new_req_info(hdr.sender, hdr.id))) {
case intrusive::queue_closed: {
View
21 src/default_peer.cpp
@@ -254,9 +254,6 @@ void default_peer::kill_proxy(const actor_ptr& sender,
CPPA_LOGMF(CPPA_DEBUG, self, "received KILL_PROXY for " << aid
<< ":" << to_string(*node));
send_as(nullptr, proxy, atom("KILL_PROXY"), reason);
- /*proxy->enqueue(nullptr,
- make_any_tuple(
- atom("KILL_PROXY"), reason));*/
}
else {
CPPA_LOG_INFO("received KILL_PROXY message but "
@@ -271,24 +268,6 @@ void default_peer::deliver(const message_header& hdr, any_tuple msg) {
hdr.sender.downcast<actor_proxy>()->deliver(hdr, std::move(msg));
}
else hdr.deliver(std::move(msg));
- /*
- auto receiver = hdr.receiver.get();
- if (receiver) {
- if (hdr.id.valid()) {
- CPPA_LOGMF(CPPA_DEBUG, self, "sync message for actor " << receiver->id());
- receiver->sync_enqueue(hdr.sender.get(), hdr.id, move(msg));
- }
- else {
- CPPA_LOGMF(CPPA_DEBUG, self, "async message with "
- << (hdr.sender ? "" : "in")
- << "valid sender");
- receiver->enqueue(hdr.sender.get(), move(msg));
- }
- }
- else {
- CPPA_LOGMF(CPPA_ERROR, self, "received message with invalid receiver");
- }
- */
}
void default_peer::link(const actor_ptr& sender, const actor_ptr& ptr) {
View
2  src/default_protocol.cpp
@@ -193,7 +193,7 @@ actor_ptr default_protocol::remote_actor(io_stream_ptr_pair io,
if (*pinf == *pinfptr) {
// dude, this is not a remote actor, it's a local actor!
CPPA_LOGMF(CPPA_ERROR, self, "remote_actor() called to access a local actor");
-# ifndef CPPA_ENABLE_DEBUG
+# ifndef CPPA_DEBUG_MODE
std::cerr << "*** warning: remote_actor() called to access a local actor\n"
<< std::flush;
# endif
View
12 src/group_manager.cpp
@@ -181,10 +181,10 @@ class local_broker : public event_based_actor {
// send to all remote subscribers
auto sender = last_sender();
CPPA_LOG_DEBUG("forward message to " << m_acquaintances.size()
- << " acquaintances; " << CPPA_TTARG(sender)
- << ", " << CPPA_TTARG(what));
+ << " acquaintances; " << CPPA_TSARG(sender)
+ << ", " << CPPA_TSARG(what));
for (auto& acquaintance : m_acquaintances) {
- send_tuple_as(sender, acquaintance, what);
+ acquaintance->enqueue({sender, acquaintance}, what);
}
}
@@ -216,7 +216,7 @@ class local_group_proxy : public local_group {
}
group::subscription subscribe(const channel_ptr& who) {
- CPPA_LOG_TRACE(CPPA_TTARG(who));
+ CPPA_LOG_TRACE(CPPA_TSARG(who));
auto res = add_subscriber(who);
if (res.first) {
if (res.second == 1) {
@@ -229,7 +229,7 @@ class local_group_proxy : public local_group {
}
void unsubscribe(const channel_ptr& who) {
- CPPA_LOG_TRACE(CPPA_TTARG(who));
+ CPPA_LOG_TRACE(CPPA_TSARG(who));
auto res = erase_subscriber(who);
if (res.first && res.second == 0) {
// leave the remote source,
@@ -358,7 +358,7 @@ class remote_group : public group {
: super(parent, move(id)), m_decorated(decorated) { }
group::subscription subscribe(const channel_ptr& who) {
- CPPA_LOG_TRACE(CPPA_TTARG(who));
+ CPPA_LOG_TRACE(CPPA_TSARG(who));
return m_decorated->subscribe(who);
}
View
48 src/local_actor.cpp
@@ -28,6 +28,7 @@
\******************************************************************************/
+#include <string>
#include "cppa/cppa.hpp"
#include "cppa/atom.hpp"
#include "cppa/logging.hpp"
@@ -54,8 +55,8 @@ class down_observer : public attachable {
void actor_exited(std::uint32_t reason) {
if (m_observer) {
- m_observer->enqueue(m_observed.get(),
- make_any_tuple(atom("DOWN"), reason));
+ message_header hdr{m_observed, m_observer, message_priority::high};
+ hdr.deliver(make_any_tuple(atom("DOWN"), reason));
}
}
@@ -69,11 +70,41 @@ class down_observer : public attachable {
};
+constexpr const char* s_default_debug_name = "actor";
+
} // namespace <anonymous>
local_actor::local_actor(bool sflag)
: m_chaining(sflag), m_trap_exit(false)
-, m_is_scheduled(sflag), m_dummy_node(), m_current_node(&m_dummy_node) { }
+, m_is_scheduled(sflag), m_dummy_node(), m_current_node(&m_dummy_node) {
+# ifdef CPPA_DEBUG_MODE
+ new (&m_debug_name) std::string (std::to_string(m_id) + "@local");
+# endif // CPPA_DEBUG_MODE
+}
+
+local_actor::~local_actor() {
+ using std::string;
+# ifdef CPPA_DEBUG_MODE
+ m_debug_name.~string();
+# endif // CPPA_DEBUG_MODE
+}
+
+const char* local_actor::debug_name() const {
+# ifdef CPPA_DEBUG_MODE
+ return m_debug_name.c_str();
+# else // CPPA_DEBUG_MODE
+ return s_default_debug_name;
+# endif // CPPA_DEBUG_MODE
+}
+
+void local_actor::debug_name(std::string str) {
+# ifdef CPPA_DEBUG_MODE
+ m_debug_name = std::move(str);
+# else // CPPA_DEBUG_MODE
+ CPPA_LOG_WARNING("unable to set debug name to " << str
+ << " (compiled without debug mode enabled)");
+# endif // CPPA_DEBUG_MODE
+}
void local_actor::monitor(const actor_ptr& whom) {
if (whom) whom->attach(attachable_ptr{new down_observer(this, whom)});
@@ -141,17 +172,6 @@ response_handle local_actor::make_response_handle() {
return std::move(result);
}
-/*
-message_id local_actor::send_timed_sync_message(const actor_ptr& whom,
- const util::duration& rel_time,
- any_tuple&& what) {
- auto mid = this->send_sync_message(whom, std::move(what));
- auto tmp = make_any_tuple(atom("TIMEOUT"));
- get_scheduler()->delayed_reply(this, rel_time, mid, std::move(tmp));
- return mid;
-}
-*/
-
void local_actor::exec_behavior_stack() {
// default implementation does nothing
}
View
16 src/logging.cpp
@@ -30,6 +30,7 @@
#include <ctime>
#include <thread>
+#include <cstring>
#include <fstream>
#include <algorithm>
@@ -85,7 +86,7 @@ class logging_impl : public logging {
void operator()() {
ostringstream fname;
fname << "libcppa_" << getpid() << "_" << time(0) << ".log";
- fstream out(fname.str().c_str(), ios::out);
+ fstream out(fname.str().c_str(), ios::out | ios::app);
unique_ptr<log_event> event;
for (;;) {
event.reset(m_queue.pop());
@@ -117,9 +118,18 @@ class logging_impl : public logging {
}
else file_name = move(full_file_name);
auto print_from = [&](ostream& oss) -> ostream& {
- if (!from) oss << "null";
+ if (!from) {
+ if (strcmp(c_class_name, "logging") == 0) oss << "logging";
+ else oss << "null";
+ }
else if (from->is_proxy()) oss << to_string(from);
- else oss << from->id() << "@local";
+ else {
+# ifdef CPPA_DEBUG_MODE
+ oss << from.downcast<local_actor>()->debug_name();
+# else // CPPA_DEBUG_MODE
+ oss << from->id() << "@local";
+# endif // CPPA_DEBUG_MODE
+ }
return oss;
};
ostringstream line;
View
6 src/middleman.cpp
@@ -45,6 +45,7 @@
#include "cppa/actor_proxy.hpp"
#include "cppa/binary_serializer.hpp"
#include "cppa/uniform_type_info.hpp"
+#include "cppa/thread_mapped_actor.hpp"
#include "cppa/binary_deserializer.hpp"
#include "cppa/process_information.hpp"
@@ -559,6 +560,11 @@ void abstract_middleman::stop_reader(const continuable_reader_ptr& ptr) {
}
void middleman_loop(middleman_impl* impl) {
+# ifdef CPPA_LOG_LEVEL
+ auto mself = make_counted<thread_mapped_actor>();
+ scoped_self_setter sss(mself.get());
+ CPPA_SET_DEBUG_NAME("middleman");
+# endif
middleman_event_handler* handler = &impl->m_handler;
CPPA_LOGF_TRACE("run middleman loop");
CPPA_LOGF_INFO("middleman runs at "
View
2  src/opencl/program.cpp
@@ -93,7 +93,7 @@ program program::create(const char* kernel_source) {
throw std::runtime_error(oss.str());
}
else {
-# ifdef CPPA_ENABLE_DEBUG
+# ifdef CPPA_DEBUG_MODE
device_ptr device_used(cppa::detail::singleton_manager::
get_command_dispatcher()->
m_devices.front().dev_id);
View
2  src/scheduler.cpp
@@ -177,7 +177,7 @@ void scheduler_helper::timer_loop(scheduler_helper::ptr_type m_self) {
done = true;
},
others() >> [&]() {
-# ifdef CPPA_ENABLE_DEBUG
+# ifdef CPPA_DEBUG_MODE
std::cerr << "scheduler_helper::timer_loop: UNKNOWN MESSAGE: "
<< to_string(msg_ptr->msg)
<< std::endl;
View
2  src/self.cpp
@@ -31,6 +31,7 @@
#include <pthread.h>
#include "cppa/self.hpp"
+#include "cppa/logging.hpp"
#include "cppa/any_tuple.hpp"
#include "cppa/scheduler.hpp"
#include "cppa/local_actor.hpp"
@@ -45,6 +46,7 @@ pthread_once_t s_key_once = PTHREAD_ONCE_INIT;
local_actor* tss_constructor() {
local_actor* result = detail::memory::create<thread_mapped_actor>();
+ CPPA_LOGF_INFO("converted thread to actor; ID = " << result->id());
result->ref();
get_scheduler()->register_converted_context(result);
return result;
View
19 src/singleton_manager.cpp
@@ -77,32 +77,31 @@ std::atomic<logging*> s_logger;
} // namespace <anonymous>
void singleton_manager::shutdown() {
- CPPA_LOGF_INFO("prepare to shutdown");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "prepare to shutdown");
if (self.unchecked() != nullptr) {
try { self.unchecked()->quit(exit_reason::normal); }
catch (actor_exited&) { }
}
//auto rptr = s_actor_registry.load();
//if (rptr) rptr->await_running_count_equal(0);
- CPPA_LOGF_DEBUG("shutdown scheduler");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "shutdown scheduler");
destroy(s_scheduler);
- CPPA_LOGF_DEBUG("shutdown middleman");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "shutdown middleman");
destroy(s_middleman);
std::atomic_thread_fence(std::memory_order_seq_cst);
// it's safe to delete all other singletons now
- CPPA_LOGF_DEBUG("close OpenCL command dispather");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "close OpenCL command dispather");
destroy(s_command_dispatcher);
- CPPA_LOGF_DEBUG("close actor registry");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "close actor registry");
destroy(s_actor_registry);
- CPPA_LOGF_DEBUG("shutdown group manager");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "shutdown group manager");
destroy(s_group_manager);
- CPPA_LOGF_DEBUG("destroy empty tuple singleton");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "destroy empty tuple singleton");
destroy(s_empty_tuple);
- CPPA_LOGF_DEBUG("clear type info map");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "clear type info map");
destroy(s_uniform_type_info_map);
- CPPA_LOGF_DEBUG("clear decorated names log");
+ CPPA_LOGF(CPPA_DEBUG, nullptr, "clear decorated names log");
destroy(s_decorated_names_map);
- CPPA_LOGF_DEBUG("shutdown logger");
destroy(s_logger);
}
View
77 src/thread_pool_scheduler.cpp
@@ -200,7 +200,7 @@ void exec_as_thread(bool is_hidden, local_actor_ptr p, F f) {
}).detach();
}
-actor_ptr thread_pool_scheduler::exec(spawn_options os, scheduled_actor_ptr p) {
+local_actor_ptr thread_pool_scheduler::exec(spawn_options os, scheduled_actor_ptr p) {
CPPA_REQUIRE(p != nullptr);
bool is_hidden = has_hide_flag(os);
if (has_detach_flag(os)) {
@@ -219,35 +219,62 @@ actor_ptr thread_pool_scheduler::exec(spawn_options os, scheduled_actor_ptr p) {
return std::move(p);
}
-actor_ptr thread_pool_scheduler::exec(spawn_options os,
- init_callback cb,
- void_function f) {
- if (has_blocking_api_flag(os)) {
-# ifndef CPPA_DISABLE_CONTEXT_SWITCHING
+local_actor_ptr thread_pool_scheduler::exec(spawn_options os,
+ init_callback cb,
+ void_function f) {
+ local_actor_ptr result;
+ auto set_result = [&](local_actor_ptr value) {
+ CPPA_REQUIRE(result == nullptr && value != nullptr);
+ result = std::move(value);
+ if (cb) cb(result.get());
+ };
+ if (has_priority_aware_flag(os)) {
+ using impl = extend<thread_mapped_actor>::with<prioritizing>;
+ set_result(make_counted<impl>());
+ exec_as_thread(has_hide_flag(os), result, [result, f] {
+ try {
+ f();
+ result->exec_behavior_stack();
+ }
+ catch (actor_exited& e) { }
+ catch (std::exception& e) {
+ CPPA_LOGF_ERROR("actor with ID " << result->id()
+ << " terminated due to an unhandled exception; "
+ << detail::demangle(typeid(e)) << ": "
+ << e.what());
+ }
+ catch (...) {
+ CPPA_LOGF_ERROR("actor with ID " << result->id()
+ << " terminated due to an unknown exception");
+ }
+ result->on_exit();
+ });
+ }
+ else if (has_blocking_api_flag(os)) {
+# ifndef CPPA_DISABLE_CONTEXT_SWITCHING
if (!has_detach_flag(os)) {
- return exec(os, make_counted<context_switching_actor>(std::move(f)));
+ auto p = make_counted<context_switching_actor>(std::move(f));
+ set_result(p);
+ exec(os, std::move(p));
}
+ else
# endif
- auto p = make_counted<thread_mapped_actor>(std::move(f));
- exec_as_thread(has_hide_flag(os), p, [p] {
- p->run();
- p->on_exit();
- });
- return std::move(p);
+ /* else tree */ {
+ auto p = make_counted<thread_mapped_actor>(std::move(f));
+ set_result(p);
+ exec_as_thread(has_hide_flag(os), p, [p] {
+ p->run();
+ p->on_exit();
+ });
+ }
}
- else if (has_priority_aware_flag(os)) {
- using impl = extend<thread_mapped_actor>::with<prioritizing>;
- auto p = make_counted<impl>();
- exec_as_thread(has_hide_flag(os), p, [p, f] {
- f();
- p->exec_behavior_stack();
- p->on_exit();
- });
- return std::move(p);
+ else {
+ auto p = event_based_actor::from(std::move(f));
+ set_result(p);
+ exec(os, p);
}
- auto p = event_based_actor::from(std::move(f));
- if (cb) cb(p.get());
- return exec(os, p);
+ CPPA_REQUIRE(result != nullptr);
+ return std::move(result);
}
} } // namespace cppa::detail
View
2  src/uniform_type_info.cpp
@@ -354,7 +354,6 @@ class msg_hdr_tinfo : public util::abstract_uniform_type_info<message_header> {
public:
virtual void serialize(const void* instance, serializer* sink) const {
- CPPA_LOG_TRACE("");
auto& hdr = *reinterpret_cast<const message_header*>(instance);
sink->begin_object(name());
actor_ptr_tinfo::s_serialize(hdr.sender, sink);
@@ -364,7 +363,6 @@ class msg_hdr_tinfo : public util::abstract_uniform_type_info<message_header> {
}
virtual void deserialize(void* instance, deserializer* source) const {
- CPPA_LOG_TRACE("");
assert_type_name(source);
source->begin_object(name());
auto& msg = *reinterpret_cast<message_header*>(instance);
View
31 unit_testing/ping_pong.cpp
@@ -16,7 +16,6 @@ namespace {
size_t s_pongs = 0;
behavior ping_behavior(size_t num_pings) {
- CPPA_LOGF_TRACE("");
return (
on(atom("pong"), arg_match) >> [num_pings](int value) {
CPPA_LOGF_ERROR_IF(!self->last_sender(), "last_sender() == nullptr");
@@ -39,13 +38,12 @@ behavior ping_behavior(size_t num_pings) {
}
behavior pong_behavior() {
- CPPA_LOGF_TRACE("");
return (
- on<atom("ping"), int>() >> [](int value) {
+ on(atom("ping"), arg_match) >> [](int value) {
CPPA_LOGF_INFO("received {'ping', " << value << "}");
reply(atom("pong"), value + 1);
},
- others() >> []() {
+ others() >> [] {
CPPA_LOGF_ERROR("unexpected message; "
<< to_string(self->last_dequeued()));
self->quit(exit_reason::user_defined);
@@ -60,31 +58,30 @@ size_t pongs() {
}
void ping(size_t num_pings) {
+ CPPA_SET_DEBUG_NAME("ping");
CPPA_LOGF_TRACE("num_pings = " << num_pings);
s_pongs = 0;
receive_loop(ping_behavior(num_pings));
}
-actor_ptr spawn_event_based_ping(size_t num_pings) {
+void event_based_ping(size_t num_pings) {
+ CPPA_SET_DEBUG_NAME("event_based_ping");
CPPA_LOGF_TRACE("num_pings = " << num_pings);
s_pongs = 0;
- return spawn([=] {
- become(ping_behavior(num_pings));
- });
+ become(ping_behavior(num_pings));
}
void pong(actor_ptr ping_actor) {
+ CPPA_SET_DEBUG_NAME("pong");
CPPA_LOGF_TRACE("ping_actor = " << to_string(ping_actor));
- // kickoff
- send(ping_actor, atom("pong"), 0);
- receive_loop (pong_behavior());
+ send(ping_actor, atom("pong"), 0); // kickoff
+ receive_loop(pong_behavior());
}
-actor_ptr spawn_event_based_pong(actor_ptr ping_actor) {
+void event_based_pong(actor_ptr ping_actor) {
+ CPPA_SET_DEBUG_NAME("event_based_pong");
CPPA_LOGF_TRACE("ping_actor = " << to_string(ping_actor));
- CPPA_REQUIRE(ping_actor.get() != nullptr);
- return factory::event_based([=] {
- become(pong_behavior());
- send(ping_actor, atom("pong"), 0);
- }).spawn();
+ CPPA_REQUIRE(ping_actor != nullptr);
+ send(ping_actor, atom("pong"), 0); // kickoff
+ become(pong_behavior());
}
View
4 unit_testing/ping_pong.hpp
@@ -8,11 +8,11 @@
void ping(size_t num_pings);
-cppa::actor_ptr spawn_event_based_ping(size_t num_pings);
+void event_based_ping(size_t num_pings);
void pong(cppa::actor_ptr ping_actor);
-cppa::actor_ptr spawn_event_based_pong(cppa::actor_ptr ping_actor);
+void event_based_pong(cppa::actor_ptr ping_actor);
// returns the number of messages ping received
size_t pongs();
View
4 unit_testing/test.hpp
@@ -102,7 +102,7 @@ inline void cppa_check_value(V1 v1,
auto cppa_test_scope_guard = ::cppa::util::make_scope_guard([] { \
std::cout << cppa_error_count() << " error(s) detected" << std::endl; \
}); \
- CPPA_LOGF(CPPA_TRACE, nullptr, "run unit test " << #testname)
+ CPPA_LOGF_INFO("run unit test " << #testname)
#define CPPA_TEST_RESULT() ((cppa_error_count() == 0) ? 0 : -1)
@@ -120,7 +120,7 @@ inline void cppa_check_value(V1 v1,
CPPA_PRINTERR(#line_of_code); \
cppa_inc_error_count(); \
} \
- else CPPA_PRINT("passed")
+ else { CPPA_PRINT("passed"); } CPPA_VOID_STMT
#define CPPA_CHECK_EQUAL(lhs_loc, rhs_loc) \
cppa_check_value((lhs_loc), (rhs_loc), __FILE__, __LINE__)
View
390 unit_testing/test_remote_actor.cpp
@@ -39,6 +39,7 @@ vector<string_pair> get_kv_pairs(int argc, char** argv, int begin = 1) {
}
void reflector() {
+ CPPA_SET_DEBUG_NAME("reflector" << self->id());
become (
others() >> [=] {
CPPA_LOGF_INFO("reflect and quit");
@@ -48,16 +49,6 @@ void reflector() {
);
}
-void replier() {
- become (
- others() >> [=] {
- CPPA_LOGF_INFO("reply and quit");
- reply(42);
- self->quit();
- }
- );
-}
-
void spawn5_server_impl(actor_ptr client, group_ptr grp) {
CPPA_LOGF_TRACE(CPPA_TARG(client, to_string)
<< ", " << CPPA_TARG(grp, to_string));
@@ -65,7 +56,7 @@ void spawn5_server_impl(actor_ptr client, group_ptr grp) {
spawn_in_group(grp, reflector);
CPPA_LOGF_INFO("send {'Spawn5'} and await {'ok', actor_vector}");
sync_send(client, atom("Spawn5"), grp).then(
- on(atom("ok"), arg_match) >> [&](const actor_vector& vec) {
+ on(atom("ok"), arg_match) >> [=](const actor_vector& vec) {
CPPA_LOGF_INFO("received vector with " << vec.size() << " elements");
send(grp, "Hello reflectors!", 5.0);
if (vec.size() != 5) {
@@ -123,11 +114,12 @@ void spawn5_server_impl(actor_ptr client, group_ptr grp) {
// receive seven reply messages (2 local, 5 remote)
void spawn5_server(actor_ptr client, bool inverted) {
+ CPPA_SET_DEBUG_NAME("spawn5_server");
if (!inverted) spawn5_server_impl(client, group::get("local", "foobar"));
else {
CPPA_LOGF_INFO("request group");
sync_send(client, atom("GetGroup")).then (
- [&](const group_ptr& remote_group) {
+ [=](const group_ptr& remote_group) {
spawn5_server_impl(client, remote_group);
}
);
@@ -135,12 +127,13 @@ void spawn5_server(actor_ptr client, bool inverted) {
}
void spawn5_client() {
+ CPPA_SET_DEBUG_NAME("spawn5_client");
become (
on(atom("GetGroup")) >> [] {
CPPA_LOGF_INFO("received {'GetGroup'}");
reply(group::get("local", "foobar"));
},
- on(atom("Spawn5"), arg_match) >> [&](const group_ptr& grp) {
+ on(atom("Spawn5"), arg_match) >> [=](const group_ptr& grp) {
CPPA_LOGF_INFO("received {'Spawn5'}");
actor_vector vec;
for (int i = 0; i < 5; ++i) {
@@ -155,134 +148,250 @@ void spawn5_client() {
);
}
-int client_part(const vector<string_pair>& args) {
- CPPA_TEST(test_remote_actor_client_part);
- auto i = find_if(args.begin(), args.end(),
- [](const string_pair& p) { return p.first == "port"; });
- if (i == args.end()) {
- throw runtime_error("no port specified");
+} // namespace <anonymous>
+
+void verbose_terminate() {
+ try { if (std::uncaught_exception()) throw; }
+ catch (std::exception& e) {
+ CPPA_PRINTERR("terminate called after throwing "
+ << to_verbose_string(e));
}
- auto port = static_cast<uint16_t>(stoi(i->second));
- auto server = remote_actor("localhost", port);
- // remote_actor is supposed to return the same server when connecting to
- // the same host again
- {
- auto server2 = remote_actor("localhost", port);
- CPPA_CHECK(server == server2);
+ catch (...) {
+ CPPA_PRINTERR("terminate called after throwing an unknown exception");
}
- send(server, atom("SpawnPing"));
- receive (
- on(atom("PingPtr"), arg_match) >> [](actor_ptr ping_actor) {
- spawn<detached + blocking_api>(pong, ping_actor);
+
+ abort();
+}
+
+template<typename T>
+void await_down(actor_ptr ptr, T continuation) {
+ become (
+ on(atom("DOWN"), arg_match) >> [=](uint32_t) -> bool {
+ if (self->last_sender() == ptr) {
+ continuation();
+ return true;
+ }
+ return false; // not the 'DOWN' message we are waiting for
}
);
- await_all_others_done();
- sync_send(server, atom("SyncMsg")).await(
- others() >> [&] {
- if (self->last_dequeued() != make_cow_tuple(atom("SyncReply"))) {
- ostringstream oss;
- oss << "unexpected message; "
- << __FILE__ << " line " << __LINE__ << ": "
- << to_string(self->last_dequeued()) << endl;
- send(server, atom("Failure"), oss.str());
+}
+
+class client : public event_based_actor {
+
+ public:
+
+ client(actor_ptr server) : m_server(std::move(server)) { }
+
+ void init() {
+ CPPA_SET_DEBUG_NAME("client");
+ spawn_ping();
+ }
+
+ private:
+
+ void spawn_ping() {
+ CPPA_PRINT("send {'SpawnPing'}");
+ send(m_server, atom("SpawnPing"));
+ become (
+ on(atom("PingPtr"), arg_match) >> [=](const actor_ptr& ping) {
+ auto pptr = spawn<monitored+detached+blocking_api>(pong, ping);
+ await_down(pptr, [=] {
+ send_sync_msg();
+ });
}
- else {
- send(server, atom("Done"));
+
+ );
+ }
+
+ void send_sync_msg() {
+ CPPA_PRINT("sync send {'SyncMsg'}");
+ sync_send(m_server, atom("SyncMsg")).then(
+ on(atom("SyncReply")) >> [=] {
+ send_foobars();
}
- },
- after(chrono::seconds(5)) >> [&] {
- CPPA_PRINTERR("sync_send timed out!");
- send(server, atom("Timeout"));
+ );
+ }
+
+ void send_foobars(int i = 0) {
+ if (i == 0) { CPPA_PRINT("send foobars"); }
+ if (i == 100) test_group_comm();
+ else {
+ CPPA_LOG_DEBUG("send message nr. " << (i+1));
+ sync_send(m_server, atom("foo"), atom("bar"), i).then (
+ on(atom("foo"), atom("bar"), i) >> [=] {
+ send_foobars(i+1);
+ }
+ );
}
- );
- receive (
- others() >> [&] {
- CPPA_FAILURE("unexpected message; "
- << __FILE__ << " line " << __LINE__ << ": "
- << to_string(self->last_dequeued()));
- },
- after(chrono::seconds(0)) >> [&] { }
- );
- // test 100 sync_messages
- for (int i = 0; i < 100; ++i) {
- sync_send(server, atom("foo"), atom("bar"), i).await(
- on(atom("foo"), atom("bar"), i) >> [] { },
- others() >> [&] {
- CPPA_FAILURE("unexpected message; "
- << __FILE__ << " line " << __LINE__ << ": "
- << to_string(self->last_dequeued()));
- },
- after(chrono::seconds(10)) >> [&] {
- CPPA_FAILURE("unexpected timeout!");
+ }
+
+ void test_group_comm() {
+ CPPA_PRINT("test group communication via network");
+ sync_send(m_server, atom("GClient")).then(
+ on(atom("GClient"), arg_match) >> [=](actor_ptr gclient) {
+ auto s5a = spawn<monitored>(spawn5_server, gclient, false);
+ await_down(s5a, [=]{
+ test_group_comm_inverted();
+ });
}
);
}
- CPPA_CHECKPOINT();
- spawn5_server(server, false);
- self->exec_behavior_stack();
- await_all_others_done();
- CPPA_CHECKPOINT();
- spawn5_client();
- self->exec_behavior_stack();
- await_all_others_done();
- CPPA_CHECKPOINT();
- // wait for locally spawned reflectors
- await_all_others_done();
- CPPA_CHECKPOINT();
- receive (
- on(atom("fwd"), arg_match) >> [&](const actor_ptr& fwd, const string&) {
- forward_to(fwd);
- }
- );
- CPPA_CHECKPOINT();
- // shutdown handshake
- send(server, atom("farewell"));
- CPPA_CHECKPOINT();
- receive(on(atom("cu")) >> [] { });
- CPPA_CHECKPOINT();
- shutdown();
- CPPA_CHECKPOINT();
- return CPPA_TEST_RESULT();
-}
+ void test_group_comm_inverted() {
+ CPPA_PRINT("test group communication via network (inverted setup)");
+ become (
+ on(atom("GClient")) >> [=] {
+ auto cptr = self->last_sender();
+ auto s5c = spawn<monitored>(spawn5_client);
+ reply(atom("GClient"), s5c);
+ await_down(s5c, [=] {
+ CPPA_CHECKPOINT();
+ self->quit();
+ });
+ }
+ );
+ }
-} // namespace <anonymous>
+ actor_ptr m_server;
-void verbose_terminate() {
- try { if (std::uncaught_exception()) throw; }
- catch (std::exception& e) {
- CPPA_PRINTERR("terminate called after throwing "
- << to_verbose_string(e));
+};
+
+class server : public event_based_actor {
+
+ public:
+
+ void init() {
+ CPPA_SET_DEBUG_NAME("server");
+ await_spawn_ping();
}
- catch (...) {
- CPPA_PRINTERR("terminate called after throwing an unknown exception");
+
+ private:
+
+ void await_spawn_ping() {
+ CPPA_PRINT("await {'SpawnPing'}");
+ become (
+ on(atom("SpawnPing")) >> [=] {
+ CPPA_PRINT("received {'SpawnPing'}");
+ auto client = self->last_sender();
+ CPPA_LOGF_ERROR_IF(!client, "last_sender() == nullptr");
+ CPPA_LOGF_INFO("spawn event-based ping actor");
+ auto pptr = spawn<monitored>(event_based_ping, 10);
+ reply(atom("PingPtr"), pptr);
+ CPPA_LOGF_INFO("wait until spawned ping actor is done");
+ await_down(pptr, [=] {
+ CPPA_CHECK_EQUAL(pongs(), 10);
+ await_sync_msg();
+ });
+ }
+ );
}
- abort();
+ void await_sync_msg() {
+ CPPA_PRINT("await {'SyncMsg'}");
+ become (
+ on(atom("SyncMsg")) >> [=] {
+ CPPA_PRINT("received {'SyncMsg'}");
+ reply(atom("SyncReply"));
+ await_foobars();
+ }
+ );
+ }
+
+ void await_foobars() {
+ CPPA_PRINT("await foobars");
+ auto foobars = make_shared<int>(0);
+ become (
+ on(atom("foo"), atom("bar"), arg_match) >> [=](int i) {
+ ++*foobars;
+ reply_tuple(self->last_dequeued());
+ if (i == 99) {
+ CPPA_CHECK_EQUAL(*foobars, 100);
+ test_group_comm();
+ }
+ }
+ );
+ }
+
+ void test_group_comm() {
+ CPPA_PRINT("test group communication via network");
+ become (
+ on(atom("GClient")) >> [=] {
+ auto cptr = self->last_sender();
+ auto s5c = spawn<monitored>(spawn5_client);
+ reply(atom("GClient"), s5c);
+ await_down(s5c, [=] {
+ test_group_comm_inverted(cptr);
+ });
+ }
+ );
+ }
+
+ void test_group_comm_inverted(actor_ptr cptr) {
+ CPPA_PRINT("test group communication via network (inverted setup)");
+ sync_send(cptr, atom("GClient")).then (
+ on(atom("GClient"), arg_match) >> [=](actor_ptr gclient) {
+ await_down(spawn<monitored>(spawn5_server, gclient, true), [=] {
+ CPPA_CHECKPOINT();
+ self->quit();
+ });
+ }
+ );
+ }
+
+};
+
+void run_client_part(const vector<string_pair>& args) {
+ CPPA_LOGF_INFO("run in client mode");
+ CPPA_TEST(test_remote_actor_client_part);
+ auto i = find_if(args.begin(), args.end(),
+ [](const string_pair& p) { return p.first == "port"; });
+ if (i == args.end()) {
+ CPPA_LOGF_ERROR("no port specified");
+ throw std::logic_error("no port specified");
+ }
+ auto port = static_cast<uint16_t>(stoi(i->second));
+ auto serv = remote_actor("localhost", port);
+ // remote_actor is supposed to return the same server when connecting to
+ // the same host again
+ {
+ auto server2 = remote_actor("localhost", port);
+ CPPA_CHECK(serv == server2);
+ }
+ auto c = spawn<client,monitored>(serv);
+ receive (
+ on(atom("DOWN"), arg_match) >> [=](uint32_t rsn) {
+ CPPA_CHECK_EQUAL(self->last_sender(), c);
+ CPPA_CHECK_EQUAL(rsn, exit_reason::normal);
+ }
+ );
}
int main(int argc, char** argv) {
set_terminate(verbose_terminate);
announce<actor_vector>();
+ CPPA_SET_DEBUG_NAME("main");
cout.unsetf(ios_base::unitbuf);
string app_path = argv[0];
bool run_remote_actor = true;
if (argc > 1) {
if (strcmp(argv[1], "run_remote_actor=false") == 0) {
+ CPPA_LOGF_INFO("don't run remote actor");
run_remote_actor = false;
}
else {
- auto args = get_kv_pairs(argc, argv);
- return client_part(args);
+ run_client_part(get_kv_pairs(argc, argv));
+ await_all_others_done();
+ shutdown();
+ return CPPA_TEST_RESULT();
}
}
CPPA_TEST(test_remote_actor);
- //auto ping_actor = spawn(ping, 10);
+ auto serv = spawn<server,monitored>();
uint16_t port = 4242;
bool success = false;
do {
try {
- publish(self, port, "127.0.0.1");
+ publish(serv, port, "127.0.0.1");
success = true;
CPPA_LOGF_DEBUG("running on port " << port);
}
@@ -308,75 +417,18 @@ int main(int argc, char** argv) {
});
}
else { CPPA_PRINT("actor published at port " << port); }
- //CPPA_PRINT("await SpawnPing message");
- actor_ptr remote_client;
- CPPA_LOGF_INFO("receive 'SpawnPing', reply with 'PingPtr'");
- receive (
- on(atom("SpawnPing")) >> [&]() {
- remote_client = self->last_sender();
- CPPA_LOGF_ERROR_IF(!remote_client, "last_sender() == nullptr");
- CPPA_LOGF_INFO("spawn event-based ping actor");
- reply(atom("PingPtr"), spawn_event_based_ping(10));
- }
- );
- CPPA_LOGF_INFO("wait until spawned ping actor is done");
- await_all_others_done();
- CPPA_CHECK_EQUAL(pongs(), 10);
- CPPA_PRINT("test remote sync_send");
- receive (
- on(atom("SyncMsg")) >> [] {
- reply(atom("SyncReply"));
- }
- );
+ CPPA_CHECKPOINT();
receive (
- on(atom("Done")) >> [] {
- // everything's fine
- },
- on(atom("Failure"), arg_match) >> [&](const string& str) {
- CPPA_FAILURE(str);
- },
- on(atom("Timeout")) >> [&] {
- CPPA_FAILURE("sync_send timed out");
+ on(atom("DOWN"), arg_match) >> [=](uint32_t rsn) {
+ CPPA_CHECK_EQUAL(self->last_sender(), serv);
+ CPPA_CHECK_EQUAL(rsn, exit_reason::normal);
}
);
- // test 100 sync messages
- CPPA_PRINT("test 100 synchronous messages");
- int i = 0;
- receive_for(i, 100) (
- others() >> [] {
- reply_tuple(self->last_dequeued());
- }
- );
- CPPA_PRINT("test group communication via network");
- spawn5_client();
- self->exec_behavior_stack();
- await_all_others_done();
- CPPA_PRINT("test group communication via network (inverted setup)");
- spawn5_server(remote_client, true);
- self->exec_behavior_stack();
- await_all_others_done();
-
- self->on_sync_failure(CPPA_UNEXPECTED_MSG_CB());
-
- // test forward_to "over network and back"
- CPPA_PRINT("test forwarding over network 'and back'");
- auto ra = spawn(replier);
- timed_sync_send(remote_client, chrono::seconds(5), atom("fwd"), ra, "hello replier!").await(
- [&](int forty_two) {
- CPPA_CHECK_EQUAL(forty_two, 42);
- auto from = self->last_sender();
- CPPA_CHECK_EQUAL(from, ra);
- if (from) CPPA_CHECK_EQUAL(from->is_proxy(), false);
- }
- );
-
- CPPA_PRINT("wait for a last goodbye");
- receive(on(atom("farewell")) >> [&] {
- send(remote_client, atom("cu"));
- CPPA_CHECKPOINT();
- });
// wait until separate process (in sep. thread) finished execution
+ await_all_others_done();
+ CPPA_CHECKPOINT();
if (run_remote_actor) child.join();
+ CPPA_CHECKPOINT();
shutdown();
return CPPA_TEST_RESULT();
}
View
1  unit_testing/test_spawn.cpp
@@ -365,6 +365,7 @@ int main() {
CPPA_PRINT("test priority aware mirror"); {
auto mirror = spawn<simple_mirror,monitored+priority_aware>();
+ CPPA_CHECKPOINT();
send(mirror, "hello mirror");
receive (
on("hello mirror") >> CPPA_CHECKPOINT_CB(),
View
1  unit_testing/test_sync_send.cpp
@@ -77,7 +77,6 @@ struct B : popular_actor {
struct C : sb_actor<C> {
behavior init_state = (
on(atom("gogo")) >> [=] {
- CPPA_CHECKPOINT();
reply(atom("gogogo"));
self->quit();
}
Please sign in to comment.
Something went wrong with that request. Please try again.